Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

## [Unreleased]

- Ability to specify http versions for http lookups.
- Amend to not log HTTP request response and header values by default.
- Added http 2 support.

## [0.22.0] - 2025-10-03

Expand Down
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ The HTTP TableLookup connector that allows for pulling data from external system
Please use [releases](https://github.com/getindata/flink-http-connector/releases) instead of the `main` branch in order to get a stable set of binaries.

The goal for HTTP TableLookup connector was to use it in Flink SQL statement as a standard table that can be later joined with other stream using pure SQL Flink.

Currently, HTTP source connector supports only Lookup Joins (TableLookup) [1] in Table/SQL API.
`HttpSink` supports both Streaming API (when using [HttpSink](src/main/java/com/getindata/connectors/http/internal/sink/HttpSink.java) built using [HttpSinkBuilder](src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkBuilder.java)) and the Table API (using connector created in [HttpDynamicTableSinkFactory](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicTableSinkFactory.java)).
Note that the connector will work with both http 1.1 and http 2 endpoints.

`HttpSink` supports both Streaming API (when using [HttpSink](src/main/java/com/getindata/connectors/http/internal/sink/HttpSink.java) built using [HttpSinkBuilder](src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkBuilder.java)) and the Table API (using connector created in [HttpDynamicTableSinkFactory](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicTableSinkFactory.java)).

## Updating the connector
In case of updating http-connector please see [Breaking changes](#breaking-changes) section.
Expand Down Expand Up @@ -581,6 +579,7 @@ be requested if the current time is later than the cached token expiry time minu
| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued |
| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. |
| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. |
| gid.connector.http.source.lookup.http-version | optional | Version of HTTP to use for lookup http requests. The valid values are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1 endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and 'HTTP/2 upgrade not supported'. |
| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. |
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public final class HttpConnectorConfigConstants {
public static final String SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER =
SOURCE_LOOKUP_PREFIX + "query-creator";

public static final String SOURCE_LOOKUP_QUERY_HTTP_VERSION =
SOURCE_LOOKUP_PREFIX + "http-version";

// -------------- HTTPS security settings --------------
public static final String ALLOW_SELF_SIGNED =
GID_CONNECTOR_HTTP + "security.cert.server.allowSelfSigned";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> reqeustBatch, UR
Builder requestBuilder = java.net.http.HttpRequest
.newBuilder()
.uri(endpointUri)
.version(Version.HTTP_2)
.version(Version.HTTP_1_1) // consider allowing users to specify http 2 to override
.timeout(Duration.ofSeconds(httpRequestTimeOutSeconds))
.method(method, publisher);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private HttpRequest buildHttpRequest(HttpSinkRequestEntry requestEntry, URI endp
Builder requestBuilder = java.net.http.HttpRequest
.newBuilder()
.uri(endpointUri)
.version(Version.HTTP_2)
.version(Version.HTTP_1_1) // consider allowing users to specify http 2 to override
.timeout(Duration.ofSeconds(httpRequestTimeOutSeconds))
.method(requestEntry.method,
BodyPublishers.ofByteArray(requestEntry.element));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpRequest.Builder;
import java.time.Duration;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
Expand Down Expand Up @@ -42,10 +41,11 @@ public BodyBasedRequestFactory(
*/
@Override
protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
return HttpRequest.newBuilder()
HttpRequest.Builder builder = super.setUpRequestMethod(lookupQueryInfo);
builder
.uri(constructUri(lookupQueryInfo))
.method(methodName, BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()))
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
.method(methodName, BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()));
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.net.URISyntaxException;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.Builder;
import java.time.Duration;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
Expand Down Expand Up @@ -48,10 +47,11 @@ protected Logger getLogger() {
*/
@Override
protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
return HttpRequest.newBuilder()
HttpRequest.Builder builder = super.setUpRequestMethod(lookupQueryInfo);
builder
.uri(constructGetUri(lookupQueryInfo))
.GET()
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
.GET();
return builder;
}

URI constructGetUri(LookupQueryInfo lookupQueryInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ public class HttpLookupConnectorOptions {
.stringType()
.noDefaultValue();

public static final ConfigOption<String> LOOKUP_HTTP_VERSION =
ConfigOptions.key(SOURCE_LOOKUP_QUERY_HTTP_VERSION)
.stringType()
.noDefaultValue()
.withDescription("Version of HTTP to use for lookup HTTP requests. " +
"The valid values are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2" +
" respectively. This option may be required as HTTP_1_1, if the" +
" endpoint is http 1.1, because some http 1.1 endpoints reject HTTP" +
" Version 2 calls, with 'Invalid HTTP request received' and " +
" 'HTTP/2 upgrade not supported'.");

public static final ConfigOption<String> LOOKUP_REQUEST_FORMAT =
ConfigOptions.key("lookup-request.format")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.Builder;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;

Expand Down Expand Up @@ -38,6 +40,7 @@ public abstract class RequestFactoryBase implements HttpRequestFactory {
*/
private final String[] headersAndValues;
private final HttpLookupConfig options;
final HttpClient.Version httpVersion;

public RequestFactoryBase(
LookupQueryCreator lookupQueryCreator,
Expand Down Expand Up @@ -65,6 +68,12 @@ public RequestFactoryBase(
DEFAULT_REQUEST_TIMEOUT_SECONDS
)
);
String httpVersionFromConfig = options.getReadableConfig().get(HttpLookupConnectorOptions.LOOKUP_HTTP_VERSION);
if (httpVersionFromConfig == null) {
httpVersion = null;
} else {
httpVersion = HttpClient.Version.valueOf(httpVersionFromConfig);
}
}

@Override
Expand All @@ -88,7 +97,14 @@ public HttpLookupSourceRequestEntry buildLookupRequest(RowData lookupRow) {
* @param lookupQuery lookup query used for request query parameters or body.
* @return {@link HttpRequest.Builder} for given lookupQuery.
*/
protected abstract Builder setUpRequestMethod(LookupQueryInfo lookupQuery);
protected Builder setUpRequestMethod(LookupQueryInfo lookupQuery) {
HttpRequest.Builder builder = HttpRequest.newBuilder()
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
if (httpVersion !=null) {
builder.version(httpVersion);
}
return builder;
}

protected static StringBuilder resolvePathParameters(LookupQueryInfo lookupQueryInfo,
StringBuilder resolvedUrl) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,56 @@
package com.getindata.connectors.http.internal.table.lookup;


import java.net.URI;
import java.net.http.HttpClient;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.flink.configuration.Configuration;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
import static org.assertj.core.api.Assertions.assertThat;

import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_HTTP_VERSION;

public class BodyBasedRequestFactoryTest {

@ParameterizedTest
@MethodSource("configProvider")
void testconstructUri(TestSpec testSpec) throws Exception {
LookupQueryInfo lookupQueryInfo = new LookupQueryInfo(testSpec.url,
Set<Configuration> configs = new HashSet();

Configuration configuration= new Configuration();
Configuration configuration_http11 = new Configuration();
Configuration configuration_http2 = new Configuration();

configuration_http2.setString(LOOKUP_HTTP_VERSION, String.valueOf(HttpClient.Version.HTTP_2));
configuration_http11.setString(LOOKUP_HTTP_VERSION, String.valueOf(HttpClient.Version.HTTP_1_1));

configs.add(configuration);
configs.add(configuration_http11);
configs.add(configuration_http2);

for(Configuration config: configs) {
LookupQueryInfo lookupQueryInfo = new LookupQueryInfo(testSpec.url,
testSpec.bodyBasedUrlQueryParams,
testSpec.pathBasedUrlParams);
HttpLookupConfig httpLookupConfig = HttpLookupConfig.builder()
HttpLookupConfig httpLookupConfig = HttpLookupConfig.builder()
.lookupMethod(testSpec.lookupMethod)
.url(testSpec.url)
.useAsync(false)
.readableConfig(config)
.build();
BodyBasedRequestFactory bodyBasedRequestFactory =
BodyBasedRequestFactory bodyBasedRequestFactory =
new BodyBasedRequestFactory("test", null, null, httpLookupConfig);

URI uri = bodyBasedRequestFactory.constructUri(lookupQueryInfo);
assertThat(uri.toString()).isEqualTo(testSpec.expected);
URI uri = bodyBasedRequestFactory.constructUri(lookupQueryInfo);
assertThat(uri.toString()).isEqualTo(testSpec.expected);
}
}

private static class TestSpec {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpRequest;

import org.junit.jupiter.api.Test;

class Slf4JHttpLookupPostRequestCallbackTest {
@Test
public void testNullResponseDoesNotError() throws URISyntaxException {
HttpRequest httpRequest = HttpRequest.newBuilder()
.method("GET", HttpRequest.BodyPublishers.ofString("foo"))
.uri(new URI("http://testing123")).build();
HttpLookupSourceRequestEntry requestEntry =
new HttpLookupSourceRequestEntry(httpRequest, new LookupQueryInfo(""));
Slf4JHttpLookupPostRequestCallback slf4JHttpLookupPostRequestCallback =
new Slf4JHttpLookupPostRequestCallback();
slf4JHttpLookupPostRequestCallback.call(null, requestEntry, "aaa", null);
}
}
Loading