Skip to content

Commit c23e414

Browse files
committed
HTTP-179 Allow HTTP version to be specified
Signed-off-by: davidradl <david_radley@uk.ibm.com>
1 parent 9604080 commit c23e414

File tree

10 files changed

+73
-21
lines changed

10 files changed

+73
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
## [Unreleased]
44

5+
- Ability to specify http versions for http lookups.
56
- Amend to not log HTTP request response and header values by default.
6-
- Added http 2 support.
77

88
## [0.22.0] - 2025-10-03
99

README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@ The HTTP TableLookup connector that allows for pulling data from external system
99
Please use [releases](https://github.com/getindata/flink-http-connector/releases) instead of the `main` branch in order to get a stable set of binaries.
1010

1111
The goal for HTTP TableLookup connector was to use it in Flink SQL statement as a standard table that can be later joined with other stream using pure SQL Flink.
12-
13-
Currently, HTTP source connector supports only Lookup Joins (TableLookup) [1] in Table/SQL API.
14-
`HttpSink` supports both Streaming API (when using [HttpSink](src/main/java/com/getindata/connectors/http/internal/sink/HttpSink.java) built using [HttpSinkBuilder](src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkBuilder.java)) and the Table API (using connector created in [HttpDynamicTableSinkFactory](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicTableSinkFactory.java)).
15-
Note that the connector will work with both http 1.1 and http 2 endpoints.
12+
13+
`HttpSink` supports both Streaming API (when using [HttpSink](src/main/java/com/getindata/connectors/http/internal/sink/HttpSink.java) built using [HttpSinkBuilder](src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkBuilder.java)) and the Table API (using connector created in [HttpDynamicTableSinkFactory](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicTableSinkFactory.java)).
1614

1715
## Updating the connector
1816
In case of updating http-connector please see [Breaking changes](#breaking-changes) section.
@@ -581,6 +579,7 @@ be requested if the current time is later than the cached token expiry time minu
581579
| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued |
582580
| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. |
583581
| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. |
582+
| gid.connector.http.source.lookup.http-version | optional | Version of HTTP to use for lookup http requests. The valid values are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1 endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and 'HTTP/2 upgrade not supported'. |
584583
| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. |
585584
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
586585
| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. |

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ public final class HttpConnectorConfigConstants {
5959
public static final String SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER =
6060
SOURCE_LOOKUP_PREFIX + "query-creator";
6161

62+
public static final String SOURCE_LOOKUP_QUERY_HTTP_VERSION =
63+
SOURCE_LOOKUP_PREFIX + "http-version";
64+
6265
// -------------- HTTPS security settings --------------
6366
public static final String ALLOW_SELF_SIGNED =
6467
GID_CONNECTOR_HTTP + "security.cert.server.allowSelfSigned";

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> reqeustBatch, UR
124124
Builder requestBuilder = java.net.http.HttpRequest
125125
.newBuilder()
126126
.uri(endpointUri)
127-
.version(Version.HTTP_2)
127+
.version(Version.HTTP_1_1) // consider allowing users to specify http 2 to override
128128
.timeout(Duration.ofSeconds(httpRequestTimeOutSeconds))
129129
.method(method, publisher);
130130

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private HttpRequest buildHttpRequest(HttpSinkRequestEntry requestEntry, URI endp
6262
Builder requestBuilder = java.net.http.HttpRequest
6363
.newBuilder()
6464
.uri(endpointUri)
65-
.version(Version.HTTP_2)
65+
.version(Version.HTTP_1_1) // consider allowing users to specify http 2 to override
6666
.timeout(Duration.ofSeconds(httpRequestTimeOutSeconds))
6767
.method(requestEntry.method,
6868
BodyPublishers.ofByteArray(requestEntry.element));

src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import java.net.http.HttpRequest;
66
import java.net.http.HttpRequest.BodyPublishers;
77
import java.net.http.HttpRequest.Builder;
8-
import java.time.Duration;
98

109
import lombok.extern.slf4j.Slf4j;
1110
import org.slf4j.Logger;
@@ -42,10 +41,11 @@ public BodyBasedRequestFactory(
4241
*/
4342
@Override
4443
protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
45-
return HttpRequest.newBuilder()
44+
HttpRequest.Builder builder = super.setUpRequestMethod(lookupQueryInfo);
45+
builder
4646
.uri(constructUri(lookupQueryInfo))
47-
.method(methodName, BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()))
48-
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
47+
.method(methodName, BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()));
48+
return builder;
4949
}
5050

5151
@Override

src/main/java/com/getindata/connectors/http/internal/table/lookup/GetRequestFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import java.net.URISyntaxException;
55
import java.net.http.HttpRequest;
66
import java.net.http.HttpRequest.Builder;
7-
import java.time.Duration;
87

98
import lombok.extern.slf4j.Slf4j;
109
import org.slf4j.Logger;
@@ -48,10 +47,11 @@ protected Logger getLogger() {
4847
*/
4948
@Override
5049
protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
51-
return HttpRequest.newBuilder()
50+
HttpRequest.Builder builder = super.setUpRequestMethod(lookupQueryInfo);
51+
builder
5252
.uri(constructGetUri(lookupQueryInfo))
53-
.GET()
54-
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
53+
.GET();
54+
return builder;
5555
}
5656

5757
URI constructGetUri(LookupQueryInfo lookupQueryInfo) {

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@ public class HttpLookupConnectorOptions {
3939
.stringType()
4040
.noDefaultValue();
4141

42+
public static final ConfigOption<String> LOOKUP_HTTP_VERSION =
43+
ConfigOptions.key(SOURCE_LOOKUP_QUERY_HTTP_VERSION)
44+
.stringType()
45+
.noDefaultValue()
46+
.withDescription("Version of HTTP to use for lookup HTTP requests. " +
47+
"The valid values are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2" +
48+
" respectively. This option may be required as HTTP_1_1, if the" +
49+
" endpoint is http 1.1, because some http 1.1 endpoints reject HTTP" +
50+
" Version 2 calls, with 'Invalid HTTP request received' and " +
51+
" 'HTTP/2 upgrade not supported'.");
52+
4253
public static final ConfigOption<String> LOOKUP_REQUEST_FORMAT =
4354
ConfigOptions.key("lookup-request.format")
4455
.stringType()

src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.getindata.connectors.http.internal.table.lookup;
22

3+
import java.net.http.HttpClient;
34
import java.net.http.HttpRequest;
45
import java.net.http.HttpRequest.Builder;
6+
import java.time.Duration;
57
import java.util.Arrays;
68
import java.util.Map;
79

@@ -38,6 +40,7 @@ public abstract class RequestFactoryBase implements HttpRequestFactory {
3840
*/
3941
private final String[] headersAndValues;
4042
private final HttpLookupConfig options;
43+
final HttpClient.Version httpVersion;
4144

4245
public RequestFactoryBase(
4346
LookupQueryCreator lookupQueryCreator,
@@ -65,6 +68,12 @@ public RequestFactoryBase(
6568
DEFAULT_REQUEST_TIMEOUT_SECONDS
6669
)
6770
);
71+
String httpVersionFromConfig = options.getReadableConfig().get(HttpLookupConnectorOptions.LOOKUP_HTTP_VERSION);
72+
if (httpVersionFromConfig == null) {
73+
httpVersion = null;
74+
} else {
75+
httpVersion = HttpClient.Version.valueOf(httpVersionFromConfig);
76+
}
6877
}
6978

7079
@Override
@@ -88,7 +97,14 @@ public HttpLookupSourceRequestEntry buildLookupRequest(RowData lookupRow) {
8897
* @param lookupQuery lookup query used for request query parameters or body.
8998
* @return {@link HttpRequest.Builder} for given lookupQuery.
9099
*/
91-
protected abstract Builder setUpRequestMethod(LookupQueryInfo lookupQuery);
100+
protected Builder setUpRequestMethod(LookupQueryInfo lookupQuery) {
101+
HttpRequest.Builder builder = HttpRequest.newBuilder()
102+
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
103+
if (httpVersion !=null) {
104+
builder.version(httpVersion);
105+
}
106+
return builder;
107+
}
92108

93109
protected static StringBuilder resolvePathParameters(LookupQueryInfo lookupQueryInfo,
94110
StringBuilder resolvedUrl) {

src/test/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactoryTest.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,56 @@
11
package com.getindata.connectors.http.internal.table.lookup;
22

3+
34
import java.net.URI;
5+
import java.net.http.HttpClient;
46
import java.util.Collection;
7+
import java.util.HashSet;
58
import java.util.Map;
9+
import java.util.Set;
610

11+
import org.apache.flink.configuration.Configuration;
712
import org.jetbrains.annotations.NotNull;
813
import org.junit.jupiter.params.ParameterizedTest;
914
import org.junit.jupiter.params.provider.MethodSource;
1015
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
1116
import static org.assertj.core.api.Assertions.assertThat;
1217

18+
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_HTTP_VERSION;
19+
1320
public class BodyBasedRequestFactoryTest {
1421

1522
@ParameterizedTest
1623
@MethodSource("configProvider")
1724
void testconstructUri(TestSpec testSpec) throws Exception {
18-
LookupQueryInfo lookupQueryInfo = new LookupQueryInfo(testSpec.url,
25+
Set<Configuration> configs = new HashSet();
26+
27+
Configuration configuration= new Configuration();
28+
Configuration configuration_http11 = new Configuration();
29+
Configuration configuration_http2 = new Configuration();
30+
31+
configuration_http2.setString(LOOKUP_HTTP_VERSION, String.valueOf(HttpClient.Version.HTTP_2));
32+
configuration_http11.setString(LOOKUP_HTTP_VERSION, String.valueOf(HttpClient.Version.HTTP_1_1));
33+
34+
configs.add(configuration);
35+
configs.add(configuration_http11);
36+
configs.add(configuration_http2);
37+
38+
for(Configuration config: configs) {
39+
LookupQueryInfo lookupQueryInfo = new LookupQueryInfo(testSpec.url,
1940
testSpec.bodyBasedUrlQueryParams,
2041
testSpec.pathBasedUrlParams);
21-
HttpLookupConfig httpLookupConfig = HttpLookupConfig.builder()
42+
HttpLookupConfig httpLookupConfig = HttpLookupConfig.builder()
2243
.lookupMethod(testSpec.lookupMethod)
2344
.url(testSpec.url)
2445
.useAsync(false)
46+
.readableConfig(config)
2547
.build();
26-
BodyBasedRequestFactory bodyBasedRequestFactory =
48+
BodyBasedRequestFactory bodyBasedRequestFactory =
2749
new BodyBasedRequestFactory("test", null, null, httpLookupConfig);
2850

29-
URI uri = bodyBasedRequestFactory.constructUri(lookupQueryInfo);
30-
assertThat(uri.toString()).isEqualTo(testSpec.expected);
51+
URI uri = bodyBasedRequestFactory.constructUri(lookupQueryInfo);
52+
assertThat(uri.toString()).isEqualTo(testSpec.expected);
53+
}
3154
}
3255

3356
private static class TestSpec {

0 commit comments

Comments
 (0)