Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [Unreleased]

- Added new HTTP Sink configuration options: `gid.connector.http.sink.success-codes`, `gid.connector.http.sink.retry-codes`, and `gid.connector.http.sink.ignored-response-codes`.
- Amend to not log HTTP request response and header values by default.
- Added http 2 support.

Expand Down
30 changes: 19 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -452,16 +452,21 @@ is provided.
## HTTP status code handler
### Sink table
You can configure a list of HTTP status codes that should be treated as errors for HTTP sink table.
By default all 400 and 500 response codes will be interpreted as error code.
By default all 400 and 500 response codes will be interpreted as error code. 500, 503, and 504 response codes will be interpreted as retry.

The sink categorizes HTTP responses into groups:
- Success codes (`gid.connector.http.sink.success-codes`): Expected successful responses. `1XX, 2XX, 3XX` are defaults
- Retry codes (`gid.connector.http.sink.retry-codes`): Transient errors that trigger automatic retries when using `at-least-once` delivery guarantee. `500, 503, 504` are defaults
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

500s should not be retried - these are internal server errors - not retriable codes. I do not think we should have any defaults.

Copy link
Author

@jonathanlehto jonathanlehto Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 500 default was done to match the lookup table defaults. I'm okay with removing 500, but, should we do the same to lookup as well? (probably outside the bounds of this pull request) I'm not sure if we need the two to match exactly as the use case is a bit different

- Ignored responses (`gid.connector.http.sink.ignored-response-codes`): Responses whose content is ignored but treated as successful.
- Error codes: Any response code not classified in the above groups.

Parameters support whitelisting and blacklisting: `2XX,404,!203` means all codes from 200-299, plus 404, except 203.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest not using whitelisting and blacklisting as the language is questionable- I have made this change in the apache version.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I will switch it to denylist & allowlist, but please let me know if you had another naming convention in my mind!


#### Legacy error code configuration
For backward compatibility, you can use the legacy properties:
- `gid.connector.http.sink.error.code` - HTTP status codes treated as errors (supports masks like `3XX, 4XX, 5XX`).
- `gid.connector.http.sink.error.code.exclude` - HTTP codes to exclude from the error list.

This behavior can be changed by using below properties in table definition (DDL) or passing it via `setProperty' method from Sink's builder. The property name are:
- `gid.connector.http.sink.error.code` used to defined HTTP status code value that should be treated as error for example 404.
Many status codes can be defined in one value, where each code should be separated with comma, for example:
`401, 402, 403`. User can use this property also to define a type code mask. In that case, all codes from given HTTP response type will be treated as errors.
An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s and 500s status codes will be treated as errors.
- `gid.connector.http.sink.error.code.exclude` used to exclude a HTTP code from error list.
Many status codes can be defined in one value, where each code should be separated with comma, for example:
`401, 402, 403`. In this example, codes 401, 402 and 403 would not be interpreted as error codes.

### Source table
The source table categorizes HTTP responses into three groups based on status codes:
Expand Down Expand Up @@ -617,8 +622,11 @@ be requested if the current time is later than the cached token expiry time minu
| sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. |
| sink.flush-buffer.timeout | optional | Threshold time in milliseconds for an element to be in a buffer before being flushed. |
| gid.connector.http.sink.request-callback | optional | Specify which `HttpPostRequestCallback` implementation to use. By default, it is set to `slf4j-logger` corresponding to `Slf4jHttpPostRequestCallback`. |
| gid.connector.http.sink.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Sink, separated with comma. |
| gid.connector.http.sink.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.sink.error.code` list, separated with comma. |
| gid.connector.http.sink.error.code `DEPRECATED` | optional | List of HTTP status codes that should be treated as errors by HTTP Sink, separated with comma. |
| gid.connector.http.sink.error.code.exclude `DEPRECATED` | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.sink.error.code` list, separated with comma. |
| gid.connector.http.sink.success-codes | optional | Comma separated http codes considered as success response. Use [1-5]XX for groups and '!' character for excluding. Defaults are `1XX,2XX,3XX` |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default should be 200, as was the case for the lookup.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1XX,2XX,3XX was done to preserve backwards compatibility to the existing/old configuration. I am okay with just 2XX, but should we place 1XX and 3XX into a default ignore code set?

| gid.connector.http.sink.retry-codes | optional | Comma separated http codes considered as transient errors that will trigger retries. Use [1-5]XX for groups and '!' character for excluding. Only used when `sink.delivery-guarantee` is set to `at-least-once`. Defaults are `500,503,504` |
| gid.connector.http.sink.ignored-response-codes | optional | Comma separated http codes. Content for these responses will be ignored. Use [1-5]XX for groups and '!' character for excluding. |
| gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. |
| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. |
| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.getindata.connectors.http;

import java.util.List;

import lombok.Getter;

import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;

@Getter
public class BatchHttpStatusCodeValidationFailedException extends Exception {
private final List<HttpRequest> failedRequests;

public BatchHttpStatusCodeValidationFailedException(String message, List<HttpRequest> failedRequests) {
super(message);
this.failedRequests = failedRequests;
}
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,57 @@
package com.getindata.connectors.http.internal;

import java.util.List;
import java.util.stream.Collectors;

import lombok.Data;
import lombok.NonNull;
import lombok.ToString;

import com.getindata.connectors.http.internal.config.ResponseItemStatus;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;

/**
* Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted
* to write, divided into two lists &mdash; successful and failed ones.
* to write.
*/
@Data
@ToString
public class SinkHttpClientResponse {

/**
* A list of successfully written requests.
* A list of requests along with write status.
*/
@NonNull
private final List<HttpRequest> successfulRequests;
private final List<ResponseItem> requests;

/**
* A list of requests that {@link SinkHttpClient} failed to write.
*/
@NonNull
private final List<HttpRequest> failedRequests;
private List<HttpRequest> getRequestByStatus(final ResponseItemStatus status) {
return requests.stream()
.filter(r -> r.getStatus().equals(status))
.map(ResponseItem::getRequest)
.collect(Collectors.toList());
}

public List<HttpRequest> getSuccessfulRequests() {
return getRequestByStatus(ResponseItemStatus.SUCCESS);
}

public List<HttpRequest> getFailedRequests() {
return getRequestByStatus(ResponseItemStatus.FAILURE);
}

public List<HttpRequest> getTemporalRequests() {
return getRequestByStatus(ResponseItemStatus.TEMPORAL);
}

public List<HttpRequest> getIgnoredRequests() {
return getRequestByStatus(ResponseItemStatus.IGNORE);
}

@Data
@ToString
public static class ResponseItem {
private final HttpRequest request;
private final ResponseItemStatus status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public final class HttpConnectorConfigConstants {
*/
public static final String GID_CONNECTOR_HTTP = "gid.connector.http.";
private static final String SOURCE_LOOKUP_PREFIX = GID_CONNECTOR_HTTP + "source.lookup.";
private static final String SINK_PREFIX = GID_CONNECTOR_HTTP + "sink.";

/**
* A property prefix for http connector header properties
Expand All @@ -45,9 +46,13 @@ public final class HttpConnectorConfigConstants {
public static final String RESULT_TYPE = SOURCE_LOOKUP_PREFIX + "result-type";

// --------- Error code handling configuration ---------
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = GID_CONNECTOR_HTTP + "sink.error.code.exclude";
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = SINK_PREFIX + "error.code.exclude";
public static final String HTTP_ERROR_SINK_CODES_LIST = SINK_PREFIX + "error.code";

public static final String SINK_SUCCESS_CODES = SINK_PREFIX + "success-codes";
public static final String SINK_RETRY_CODES = SINK_PREFIX + "retry-codes";
public static final String SINK_IGNORE_RESPONSE_CODES = SINK_PREFIX + "ignored-response-codes";

public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";
// -----------------------------------------------------

public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.getindata.connectors.http.internal.config;

public enum ResponseItemStatus {
SUCCESS("success"),
TEMPORAL("temporal"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think RETRIABLE would be better than TEMPORAL.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay! I used to TEMPORAL to match the lookup table's language. Eventually, we may want to change it their as well

IGNORE("ignore"),
FAILURE("failure");

private final String status;

ResponseItemStatus(String status) {
this.status = status;
}

public String getStatus() {
return status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import com.getindata.connectors.http.BatchHttpStatusCodeValidationFailedException;
import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.utils.ThreadUtils;

/**
Expand Down Expand Up @@ -89,20 +91,44 @@ protected void submitRequestEntries(
if (err != null) {
int failedRequestsNumber = requestEntries.size();
log.error(
"Http Sink fatally failed to write all {} requests",
failedRequestsNumber);
"Http Sink fatally failed to write {} requests",
failedRequestsNumber,
err
);
numRecordsSendErrorsCounter.inc(failedRequestsNumber);

// TODO: Make `HttpSinkInternal` retry the failed requests.
// Currently, it does not retry those at all, only adds their count
// to the `numRecordsSendErrors` metric. It is due to the fact we do not have
// a clear image how we want to do it, so it would be both efficient and correct.
//requestResult.accept(requestEntries);
} else if (response.getFailedRequests().size() > 0) {
int failedRequestsNumber = response.getFailedRequests().size();
log.error("Http Sink failed to write and will retry {} requests",
failedRequestsNumber);
numRecordsSendErrorsCounter.inc(failedRequestsNumber);
} else {
List<HttpRequest> failedRequests = response.getFailedRequests();
List<HttpRequest> ignoredRequests = response.getIgnoredRequests();
List<HttpRequest> temporalRequests = response.getTemporalRequests();

if (!failedRequests.isEmpty()) {
numRecordsSendErrorsCounter.inc(failedRequests.size());
log.error(
"failed requests: {}, throwing BatchHttpStatusCodeValidationFailedException from sink",
failedRequests
);
getFatalExceptionCons().accept(new BatchHttpStatusCodeValidationFailedException(
String.format("Received %d fatal response codes", failedRequests.size()), failedRequests)
);
}

if (!ignoredRequests.isEmpty()) {
log.info("Ignoring {} requests", ignoredRequests.size());
}

if (!temporalRequests.isEmpty()) {
numRecordsSendErrorsCounter.inc(temporalRequests.size());
log.error(
"Http Sink failed to write {} temporal requests",
temporalRequests.size()
);
}

// TODO: Make `HttpSinkInternal` retry the failed requests. Currently,
// it does not retry those at all, only adds their count to the
Expand Down
Loading