Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion devdocs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
| Kafka | All | All | produce, fetch | Yes | No | Might fail getting topic name for fetch requests in newer versions of kafka (where Fetch api version >= 13) |
| JsonRPC | Go | All | - | Yes | No | N/A |
| GraphQL | All but Go | All | All | Yes | No | N/A |
| Elasticsearch | All but Go | 7.14+ | /_search | Yes | No | N/A |
| Elasticsearch | All but Go | 7.14+ | /_search, /_msearch, /_bulk, /_doc | Yes | No | N/A |
| AWS S3 | All but Go | | CreateBucket, DeleteBucket, PutObject, DeleteObject, ListBuckets, ListObjects, GetObject | Yes | No | N/A |
| AWS SQS | All but Go | | All | Yes | No | N/A |
79 changes: 79 additions & 0 deletions internal/test/integration/components/elasticsearch/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,85 @@ async def search():
sys.exit(1)
return {"status": "OK"}

@app.get("/msearch")
async def msearch():
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_msearch"
searches = [
{},
{
"query": {
"match": {
"message": "this is a test"
}
}
},
{
"index": "my-index-000002"
},
{
"query": {
"match_all": {}
}
}
]
try:
response = requests.post(ELASTICSEARCH_URL, json=searches, headers=HEADERS)

except Exception as e:
print(json.dumps({"error": str(e)}))
sys.exit(1)
return {"status": "OK"}


@app.get("/bulk")
async def bulk():
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_bulk"
actions=[
{
"index": {
"_index": "test",
"_id": "1"
}
},
{
"field1": "value1"
},
{
"delete": {
"_index": "test",
"_id": "2"
}
},
{
"create": {
"_index": "test",
"_id": "3"
}
},
{
"field1": "value3"
},
{
"update": {
"_id": "1",
"_index": "test"
}
},
{
"doc": {
"field2": "value2"
}
}
]
try:
response = requests.post(ELASTICSEARCH_URL, json=actions, headers=HEADERS)

except Exception as e:
print(json.dumps({"error": str(e)}))
sys.exit(1)
return {"status": "OK"}


if __name__ == "__main__":
print(f"Server running: port={8080} process_id={os.getpid()}")
uvicorn.run(app, host="0.0.0.0", port=8080)
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ services:
OTEL_EBPF_LOG_CONFIG: "yaml"
OTEL_EBPF_BPF_DEBUG: "TRUE"
OTEL_EBPF_HOSTNAME: "beyla"
OTEL_EBPF_BPF_HTTP_REQUEST_TIMEOUT: "5s"
OTEL_EBPF_BPF_HTTP_REQUEST_TIMEOUT: "30s"
OTEL_EBPF_PROCESSES_INTERVAL: "100ms"
OTEL_EBPF_METRICS_FEATURES: "application"
OTEL_EBPF_BPF_BUFFER_SIZE_HTTP: 1024
Expand Down
42 changes: 37 additions & 5 deletions internal/test/integration/test_python_elasticsearchclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func testPythonElasticsearch(t *testing.T) {
// populate elasticsearch with a custom value
populate(t, url)
testElasticsearchSearch(t, comm, url, index)
// populate is optional, the elasticsearch request will fail
// but we will have the span
testElasticsearchMsearch(t, comm, url)
testElasticsearchBulk(t, comm, url)
testElasticsearchDoc(t, comm, index)
}

func populate(t *testing.T, url string) {
Expand All @@ -38,7 +43,7 @@ func populate(t *testing.T, url string) {
}

func testElasticsearchSearch(t *testing.T, comm, url, index string) {
queryText := "{\"query\":{\"match\":{\"name\":\"OBI\"}}}"
queryText := "{\"query\": {\"match\": {\"name\": \"OBI\"}}}"
urlPath := "/search"
ti.DoHTTPGet(t, url+urlPath, 200)

Expand All @@ -48,8 +53,13 @@ func testElasticsearchSearch(t *testing.T, comm, url, index string) {
func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index string) {
params := neturl.Values{}
params.Add("service", comm)
operatioName := op + " " + index
params.Add("operationName", operatioName)
var operationName string
if index != "" {
operationName = op + " " + index
} else {
operationName = op
}
params.Add("operationName", operationName)
fullJaegerURL := fmt.Sprintf("%s?%s", jaegerQueryURL, params.Encode())

test.Eventually(t, testTimeout, func(t require.TestingT) {
Expand All @@ -67,11 +77,11 @@ func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index strin
lastTrace := traces[len(traces)-1]
span := lastTrace.Spans[0]

assert.Equal(t, operatioName, span.OperationName)
assert.Contains(t, span.OperationName, operationName)

tag, found := jaeger.FindIn(span.Tags, "db.query.text")
assert.True(t, found)
assert.JSONEq(t, queryText, tag.Value.(string))
assert.Equal(t, queryText, tag.Value.(string))

tag, found = jaeger.FindIn(span.Tags, "db.collection.name")
assert.True(t, found)
Expand All @@ -90,3 +100,25 @@ func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index strin
assert.Empty(t, tag.Value)
}, test.Interval(100*time.Millisecond))
}

func testElasticsearchMsearch(t *testing.T, comm, url string) {
queryText := "[{}, {\"query\": {\"match\": {\"message\": \"this is a test\"}}}, {\"index\": \"my-index-000002\"}, {\"query\": {\"match_all\": {}}}]"
urlPath := "/msearch"
ti.DoHTTPGet(t, url+urlPath, 200)

assertElasticsearchOperation(t, comm, "msearch", queryText, "")
}

func testElasticsearchBulk(t *testing.T, comm, url string) {
queryText := "[{\"index\": {\"_index\": \"test\", \"_id\": \"1\"}}, {\"field1\": \"value1\"}, {\"delete\": {\"_index\": \"test\", \"_id\": \"2\"}}, {\"create\": {\"_index\": \"test\", \"_id\": \"3\"}}, {\"field1\": \"value3\"}, {\"update\": {\"_id\": \"1\", \"_index\": \"test\"}}, {\"doc\": {\"field2\": \"value2\"}}]"
urlPath := "/bulk"
ti.DoHTTPGet(t, url+urlPath, 200)

assertElasticsearchOperation(t, comm, "bulk", queryText, "")
}

func testElasticsearchDoc(t *testing.T, comm, index string) {
queryText := "{\"name\": \"OBI\", \"description\": \"very cool\"}"

assertElasticsearchOperation(t, comm, "doc", queryText, index)
}
105 changes: 56 additions & 49 deletions pkg/ebpf/common/http/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package ebpfcommon

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -20,19 +19,34 @@ import (
type elasticsearchOperation struct {
NodeName string
DBQueryText string
DBOperationName string
DBCollectionName string
}

const (
pathSearch string = "_search"
)
var elasticsearchOperationMethods = map[string]map[string]struct{}{
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-search
"search": {http.MethodPost: {}, http.MethodGet: {}},
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-msearch
"msearch": {http.MethodPost: {}, http.MethodGet: {}},
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-bulk
"bulk": {http.MethodPost: {}, http.MethodPut: {}},
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-get
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-index
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-exists
"doc": {http.MethodGet: {}, http.MethodPost: {}, http.MethodPut: {}, http.MethodHead: {}, http.MethodDelete: {}},
}

func ElasticsearchSpan(baseSpan *request.Span, req *http.Request, resp *http.Response) (request.Span, bool) {
if !isElasticsearchResponse(resp) {
return *baseSpan, false
}
if err := isSearchRequest(req); err != nil {

operationName := extractElasticsearchOperationName(req)
if operationName == "" {
return *baseSpan, false
}

if err := isElasticsearchSupportedRequest(operationName, req.Method); err != nil {
slog.Debug(err.Error())
return *baseSpan, false
}
Expand All @@ -42,19 +56,14 @@ func ElasticsearchSpan(baseSpan *request.Span, req *http.Request, resp *http.Res
slog.Debug("parse Elasticsearch request", "error", err)
return *baseSpan, false
}

if resp != nil {
if v := resp.Header.Get("X-Found-Handling-Instance"); v != "" {
op.NodeName = v
}
} else {
op.NodeName = req.URL.Host
if v := resp.Header.Get("X-Found-Handling-Instance"); v != "" {
op.NodeName = v
}

baseSpan.SubType = request.HTTPSubtypeElasticsearch
baseSpan.Elasticsearch = &request.Elasticsearch{
NodeName: op.NodeName,
DBOperationName: op.DBOperationName,
DBOperationName: operationName,
DBCollectionName: op.DBCollectionName,
DBQueryText: op.DBQueryText,
}
Expand All @@ -67,42 +76,23 @@ func parseElasticsearchRequest(req *http.Request) (elasticsearchOperation, error
if err != nil {
return op, fmt.Errorf("failed to read Elasticsearch request body %w", err)
}

req.Body = io.NopCloser(bytes.NewBuffer(reqB))
if len(reqB) == 0 {
op.DBQueryText = ""
} else {
dbQueryText, err := extractDBQueryText(reqB)
if err != nil {
return op, err
}
op.DBQueryText = dbQueryText
}
op.DBOperationName = extractOperationName(req)
op.DBCollectionName = extractDBCollectionName(req)
op.DBQueryText = string(reqB)
op.DBCollectionName = extractElasticsearchDBCollectionName(req)
return op, nil
}

func extractDBQueryText(body []byte) (string, error) {
var buf bytes.Buffer

if err := json.Compact(&buf, body); err != nil {
return "", fmt.Errorf("invalid Elasticsearch JSON body: %w", err)
func isElasticsearchSupportedRequest(operationName, methodName string) error {
methods, exists := elasticsearchOperationMethods[operationName]
if !exists {
return errors.New("parse Elasticsearch request: unsupported endpoint")
}

return buf.String(), nil
}

func isSearchRequest(req *http.Request) error {
// let's focus only on _search operation that has only GET and POST http methods
if !strings.Contains(req.URL.Path, pathSearch) {
return errors.New("parse Elasticsearch search request: unsupported endpoint")
}

if req.Method != http.MethodGet && req.Method != http.MethodPost {
return errors.New("parse Elasticsearch search request: unsupported method")
_, supported := methods[methodName]
if supported {
return nil
}
return nil
return fmt.Errorf("parse Elasticsearch %s request: unsupported method %s", operationName, methodName)
}

// isElasticsearchResponse checks if X-Elastic-Product HTTP header is present.
Expand All @@ -114,26 +104,43 @@ func isElasticsearchResponse(resp *http.Response) bool {
return headerValue == expectedValue
}

// extractOperationName is a generic function used to extract the operation name
// extractElasticsearchOperationName is a generic function used to extract the operation name
// that is the endpoint identifier provided in the request
func extractOperationName(req *http.Request) string {
// we can have different operations where the name of the operation is found in
// the last or second to last part of the url
func extractElasticsearchOperationName(req *http.Request) string {
path := strings.Trim(req.URL.Path, "/")
if path == "" {
return ""
}

parts := strings.Split(path, "/")
if len(parts) == 0 {
return ""
}
name := parts[len(parts)-1]
return strings.TrimPrefix(name, "_")

lastPart := parts[len(parts)-1]
possibleOperationName := strings.TrimPrefix(lastPart, "_")

if _, found := elasticsearchOperationMethods[possibleOperationName]; found {
return possibleOperationName
}

if len(parts) >= 2 {
secondLastPart := parts[len(parts)-2]
possibleOperationName = strings.TrimPrefix(secondLastPart, "_")
if _, found := elasticsearchOperationMethods[possibleOperationName]; found {
return possibleOperationName
}
}
return ""
}

// extractDBCollectionName takes into account this rule from semconv
// extractElasticsearchDBCollectionName takes into account this rule from semconv
// The query may target multiple indices or data streams,
// in which case it SHOULD be a comma separated list of those.
// If the query doesn’t target a specific index, this field MUST NOT be set.
func extractDBCollectionName(req *http.Request) string {
func extractElasticsearchDBCollectionName(req *http.Request) string {
path := strings.Trim(req.URL.Path, "/")
if path == "" {
return ""
Expand Down
Loading
Loading