Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@

import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.reindex.DeleteByQueryMetrics.DELETE_BY_QUERY_TIME_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM_REMOTE;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM_REMOTE;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM_REMOTE;
import static org.elasticsearch.reindex.UpdateByQueryMetrics.UPDATE_BY_QUERY_TIME_HISTOGRAM;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class ReindexPluginMetricsIT extends ESIntegTestCase {
Expand Down Expand Up @@ -75,33 +81,74 @@ public void testReindexMetrics() throws Exception {
// Use assertBusy to wait for all threads to complete so we get deterministic results
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
});

// Now none of them
createIndex("none");
reindex().source("source").destination("none").filter(termQuery("foo", "no_match")).get();
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(2));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(2));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
});

// Now half of them
reindex().source("source").destination("dest_half").filter(termQuery("foo", "a")).get();
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(3));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(3));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(3));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
});

// Limit with maxDocs
reindex().source("source").destination("dest_size_one").maxDocs(1).get();
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(4));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(4));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(4));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
});
}

public void testReindexMetricsWithBulkFailure() throws Exception {
final String dataNodeName = internalCluster().startNode();
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
.filterPlugins(TestTelemetryPlugin.class)
.findFirst()
.orElseThrow();

// source and destination have conflicting mappings to cause bulk failures
indexRandom(true, prepareIndex("source").setId("2").setSource("test", "words words"));
indexRandom(true, prepareIndex("dest").setId("1").setSource("test", 10));

var response = reindex().source("source").destination("dest").get();
assertThat(response.getBulkFailures().size(), greaterThanOrEqualTo(1));

assertBusy(() -> {
testTelemetryPlugin.collect();
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,66 @@
public class ReindexMetrics {

public static final String REINDEX_TIME_HISTOGRAM = "es.reindex.duration.histogram";
// metrics for remote reindex should be a subset of the all metrics
public static final String REINDEX_TIME_HISTOGRAM_REMOTE = "es.reindex.duration.histogram.remote";
public static final String REINDEX_SUCCESS_HISTOGRAM = "es.reindex.completion.success";
public static final String REINDEX_SUCCESS_HISTOGRAM_REMOTE = "es.reindex.completion.success.remote";
public static final String REINDEX_FAILURE_HISTOGRAM = "es.reindex.completion.failure";
public static final String REINDEX_FAILURE_HISTOGRAM_REMOTE = "es.reindex.completion.failure.remote";

private final LongHistogram reindexTimeSecsHistogram;
private final LongHistogram reindexTimeSecsHistogramRemote;
private final LongHistogram reindexSuccessHistogram;
private final LongHistogram reindexSuccessHistogramRemote;
private final LongHistogram reindexFailureHistogram;
private final LongHistogram reindexFailureHistogramRemote;

public ReindexMetrics(MeterRegistry meterRegistry) {
this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds"));
}
this.reindexTimeSecsHistogram = meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds");
this.reindexTimeSecsHistogramRemote = meterRegistry.registerLongHistogram(
REINDEX_TIME_HISTOGRAM_REMOTE,
"Time to reindex by search from remote cluster",
"seconds"
);

this.reindexSuccessHistogram = meterRegistry.registerLongHistogram(
REINDEX_SUCCESS_HISTOGRAM,
"Number of successful reindex",
"unit"
);
this.reindexSuccessHistogramRemote = meterRegistry.registerLongHistogram(
REINDEX_SUCCESS_HISTOGRAM_REMOTE,
"Number of successful reindex from remote cluster",
"unit"
);

private ReindexMetrics(LongHistogram reindexTimeSecsHistogram) {
this.reindexTimeSecsHistogram = reindexTimeSecsHistogram;
this.reindexFailureHistogram = meterRegistry.registerLongHistogram(REINDEX_FAILURE_HISTOGRAM, "Number of failed reindex", "unit");
this.reindexFailureHistogramRemote = meterRegistry.registerLongHistogram(
REINDEX_FAILURE_HISTOGRAM_REMOTE,
"Number of failed reindex from remote cluster",
"unit"
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a LongCounter? I don't think it makes sense to use a histogram when every value you're recording is 1L, does it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of using Counter, my understanding is that Counter accumulates over time, which is good if we only want to track the the total amount at any given time. Whereas using histogram has the advantage of aggregating over arbitrary period of time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a counter accumulates over time. In my experience, you then use it by calculating a rate over some period of time e.g. 15mins. That's considered best practice when using Prometheus, for example.

I don't know whether the same thing works in Kibana. It relies on it being cheap and easy to compute rates. We should maybe ask around for advice — either within the team or with some subject expert.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the other option is to ditch this metric and just have a single histogram metric for the durations with the error attribute as well as the source attribute? That would mean combining the listeners, I haven't checked whether that's doable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to find some existing Counter metrics in Kibana, and found the emitted values are not accumulating, which makes me wonder if my understanding on Counter is actually correct. I have put the details in this thread, hopefully someone can help answer there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @samxbr.

}

public long recordTookTime(long tookTime) {
public long recordTookTime(long tookTime, boolean remote) {
reindexTimeSecsHistogram.record(tookTime);
if (remote) {
reindexTimeSecsHistogramRemote.record(tookTime);
}
return tookTime;
}

public void recordSuccess(boolean remote) {
reindexSuccessHistogram.record(1L);
if (remote) {
reindexSuccessHistogramRemote.record(1L);
}
}

public void recordFailure(boolean remote) {
reindexFailureHistogram.record(1L);
if (remote) {
reindexFailureHistogramRemote.record(1L);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,49 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl
projectResolver.getProjectState(clusterService.state()),
reindexSslConfig,
request,
ActionListener.runAfter(listener, () -> {
long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
if (reindexMetrics != null) {
reindexMetrics.recordTookTime(elapsedTime);
}
})
wrapWithMetrics(listener, reindexMetrics, startTime, request.getRemoteInfo() != null)
);
searchAction.start();
}
);
}

// Visible for testing
static ActionListener<BulkByScrollResponse> wrapWithMetrics(
ActionListener<BulkByScrollResponse> listener,
@Nullable ReindexMetrics metrics,
long startTime,
boolean isRemote
) {
if (metrics == null) {
return listener;
}
var withCompletionMetrics = new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
if ((bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().isEmpty() == false)
|| (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().isEmpty() == false)) {
metrics.recordFailure(isRemote);
listener.onResponse(bulkByScrollResponse);
} else {
metrics.recordSuccess(isRemote);
listener.onResponse(bulkByScrollResponse);
}
}

@Override
public void onFailure(Exception e) {
metrics.recordFailure(isRemote);
listener.onFailure(e);
}
};

return ActionListener.runAfter(withCompletionMetrics, () -> {
long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
metrics.recordTookTime(elapsedTime, isRemote);
});
}

/**
* Build the {@link RestClient} used for reindexing from remote clusters.
* @param remoteInfo connection information for the remote cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,87 @@

import java.util.List;

import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM_REMOTE;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM_REMOTE;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM_REMOTE;

public class ReindexMetricsTests extends ESTestCase {

private RecordingMeterRegistry recordingMeterRegistry;
private RecordingMeterRegistry registry;
private ReindexMetrics metrics;

@Before
public void createMetrics() {
recordingMeterRegistry = new RecordingMeterRegistry();
metrics = new ReindexMetrics(recordingMeterRegistry);
registry = new RecordingMeterRegistry();
metrics = new ReindexMetrics(registry);
}

public void testRecordTookTime() {
int secondsTaken = randomIntBetween(1, 50);
metrics.recordTookTime(secondsTaken);
List<Measurement> measurements = recordingMeterRegistry.getRecorder()
.getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM);
assertEquals(measurements.size(), 1);
assertEquals(measurements.get(0).getLong(), secondsTaken);
long secondsTaken = randomLongBetween(1, Long.MAX_VALUE);

// first metric
metrics.recordTookTime(secondsTaken, false);

List<Measurement> measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM);
assertEquals(1, measurements.size());
assertEquals(secondsTaken, measurements.getFirst().getLong());

// second metric
long remoteSecondsTaken = randomLongBetween(1, Long.MAX_VALUE);
metrics.recordTookTime(remoteSecondsTaken, true);

measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM);
assertEquals(2, measurements.size());
assertEquals(secondsTaken, measurements.getFirst().getLong());
assertEquals(remoteSecondsTaken, measurements.getLast().getLong());
List<Measurement> measurementsRemote = registry.getRecorder()
.getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM_REMOTE);
assertEquals(1, measurementsRemote.size());
assertEquals(remoteSecondsTaken, measurementsRemote.getFirst().getLong());
}

public void testRecordSuccess() {
// first metric
metrics.recordSuccess(false);

List<Measurement> measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_SUCCESS_HISTOGRAM);
assertEquals(1, measurements.size());
assertEquals(1, measurements.getFirst().getLong());

// second metric
metrics.recordSuccess(true);

measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_SUCCESS_HISTOGRAM);
assertEquals(2, measurements.size());
assertEquals(1, measurements.getFirst().getLong());
assertEquals(1, measurements.getLast().getLong());
List<Measurement> measurementsRemote = registry.getRecorder()
.getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_SUCCESS_HISTOGRAM_REMOTE);
assertEquals(1, measurementsRemote.size());
assertEquals(1, measurements.getFirst().getLong());
}

public void testRecordFailure() {
// first metric
metrics.recordFailure(false);

List<Measurement> measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_FAILURE_HISTOGRAM);
assertEquals(1, measurements.size());
assertEquals(1, measurements.getFirst().getLong());

// second metric
metrics.recordFailure(true);

measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_FAILURE_HISTOGRAM);
assertEquals(2, measurements.size());
assertEquals(1, measurements.getFirst().getLong());
assertEquals(1, measurements.getLast().getLong());
List<Measurement> measurementsRemote = registry.getRecorder()
.getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_FAILURE_HISTOGRAM_REMOTE);
assertEquals(1, measurementsRemote.size());
assertEquals(1, measurements.getFirst().getLong());
}
}
Loading