Skip to content

Commit b923cd0

Browse files
kjswaruphphipag
andauthored
feat: add CRaC priming support to powertools-kafka module (#2145)
* feat: add CRaC priming support to powertools-kafka module - Add maven test profile and classesloaded.txt for preloading - Add Crac dependency and update PowertoolsSerializer to register as Crac Resource - Add tests in PowertoolsSerializerTest to assert beforeCheckpoint and afterRestore hooks do not throw exception * feat: add avro priming * refactor: remove avro priming * fix: add space before opening braces Co-authored-by: Philipp Page <philipp.page@yahoo.de> * Add powertools-kafka to check build workflows. * Make input streams auto-closable in PowertoolsSerializerTest * Wrap result input stream directly in try clause. * Fix PMD finding for IOException input stream. --------- Co-authored-by: Philipp Page <philipp.page@yahoo.de> Co-authored-by: Philipp Page <pagejep@amazon.com>
1 parent a91c090 commit b923cd0

File tree

6 files changed

+6806
-39
lines changed

6 files changed

+6806
-39
lines changed

.github/workflows/check-build.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ on:
2323
- 'powertools-large-messages/**'
2424
- 'powertools-logging/**'
2525
- 'powertools-metrics/**'
26+
- 'powertools-kafka/**'
2627
- 'powertools-parameters/**'
2728
- 'powertools-serialization/**'
2829
- 'powertools-sqs/**'

.github/workflows/check-spotbugs.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ on:
2020
- 'powertools-large-messages/**'
2121
- 'powertools-logging/**'
2222
- 'powertools-metrics/**'
23+
- 'powertools-kafka/**'
2324
- 'powertools-parameters/**'
2425
- 'powertools-serialization/**'
2526
- 'powertools-sqs/**'
@@ -47,4 +48,4 @@ jobs:
4748
distribution: 'corretto'
4849
java-version: 21
4950
- name: Build with Maven for spotbugs check to mark build as fail if voilations found
50-
run: mvn -Pbuild-with-spotbugs -B install --file pom.xml -DskipTests -Dmaven.javadoc.skip=true -Dspotbugs.failOnError=true
51+
run: mvn -Pbuild-with-spotbugs -B install --file pom.xml -DskipTests -Dmaven.javadoc.skip=true -Dspotbugs.failOnError=true

powertools-kafka/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@
8888
<artifactId>sdk-core</artifactId>
8989
<scope>provided</scope>
9090
</dependency>
91+
<dependency>
92+
<groupId>org.crac</groupId>
93+
<artifactId>crac</artifactId>
94+
</dependency>
9195

9296
<!-- Test dependencies -->
9397
<dependency>
@@ -238,4 +242,25 @@
238242
</plugins>
239243
</build>
240244

245+
<profiles>
246+
<profile>
247+
<id>generate-classesloaded-file</id>
248+
<build>
249+
<plugins>
250+
<plugin>
251+
<groupId>org.apache.maven.plugins</groupId>
252+
<artifactId>maven-surefire-plugin</artifactId>
253+
<configuration>
254+
<argLine>
255+
-Xlog:class+load=info:classesloaded.txt
256+
--add-opens java.base/java.util=ALL-UNNAMED
257+
--add-opens java.base/java.lang=ALL-UNNAMED
258+
</argLine>
259+
</configuration>
260+
</plugin>
261+
</plugins>
262+
</build>
263+
</profile>
264+
</profiles>
265+
241266
</project>

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/PowertoolsSerializer.java

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,20 @@
1414

1515
import java.io.InputStream;
1616
import java.io.OutputStream;
17+
import java.lang.reflect.ParameterizedType;
1718
import java.lang.reflect.Type;
1819
import java.util.Map;
1920

2021
import com.amazonaws.services.lambda.runtime.CustomPojoSerializer;
2122
import com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory;
23+
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
2224

25+
import org.apache.kafka.clients.consumer.ConsumerRecord;
26+
import org.apache.kafka.clients.consumer.ConsumerRecords;
27+
import org.crac.Context;
28+
import org.crac.Core;
29+
import org.crac.Resource;
30+
import software.amazon.lambda.powertools.common.internal.ClassPreLoader;
2331
import software.amazon.lambda.powertools.kafka.internal.DeserializationUtils;
2432
import software.amazon.lambda.powertools.kafka.serializers.KafkaAvroDeserializer;
2533
import software.amazon.lambda.powertools.kafka.serializers.KafkaJsonDeserializer;
@@ -30,11 +38,11 @@
3038
/**
3139
* Custom Lambda serializer supporting Kafka events. It delegates to the appropriate deserializer based on the
3240
* deserialization type specified by {@link software.amazon.lambda.powertools.kafka.Deserialization} annotation.
33-
*
41+
*
3442
* Kafka serializers need to be specified explicitly, otherwise, the default Lambda serializer from
3543
* {@link com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory} will be used.
3644
*/
37-
public class PowertoolsSerializer implements CustomPojoSerializer {
45+
public class PowertoolsSerializer implements CustomPojoSerializer, Resource {
3846
private static final Map<DeserializationType, PowertoolsDeserializer> DESERIALIZERS = Map.of(
3947
DeserializationType.KAFKA_JSON, new KafkaJsonDeserializer(),
4048
DeserializationType.KAFKA_AVRO, new KafkaAvroDeserializer(),
@@ -43,6 +51,13 @@ DeserializationType.KAFKA_PROTOBUF, new KafkaProtobufDeserializer(),
4351

4452
private final PowertoolsDeserializer deserializer;
4553

54+
private static final PowertoolsSerializer INSTANCE = new PowertoolsSerializer();
55+
56+
// CRaC registration happens at class loading time
57+
static {
58+
Core.getGlobalContext().register(INSTANCE);
59+
}
60+
4661
public PowertoolsSerializer() {
4762
this.deserializer = DESERIALIZERS.getOrDefault(
4863
DeserializationUtils.determineDeserializationType(),
@@ -64,4 +79,64 @@ public <T> void toJson(T value, OutputStream output, Type type) {
6479
// This is the Lambda default Output serialization
6580
JacksonFactory.getInstance().getSerializer(type).toJson(value, output);
6681
}
82+
83+
@Override
84+
public void beforeCheckpoint(Context<? extends Resource> context) throws Exception {
85+
JacksonFactory.getInstance().getSerializer(KafkaEvent.class);
86+
JacksonFactory.getInstance().getSerializer(ConsumerRecord.class);
87+
JacksonFactory.getInstance().getSerializer(String.class);
88+
89+
DeserializationUtils.determineDeserializationType();
90+
91+
jsonPriming();
92+
93+
ClassPreLoader.preloadClasses();
94+
}
95+
96+
@Override
97+
public void afterRestore(Context<? extends Resource> context) throws Exception {
98+
// No action needed after restore
99+
}
100+
101+
private void jsonPriming() {
102+
String kafkaJson = "{\n" +
103+
" \"eventSource\": \"aws:kafka\",\n" +
104+
" \"records\": {\n" +
105+
" \"prime-topic-1\": [\n" +
106+
" {\n" +
107+
" \"topic\": \"prime-topic-1\",\n" +
108+
" \"partition\": 0,\n" +
109+
" \"offset\": 0,\n" +
110+
" \"timestamp\": 1545084650987,\n" +
111+
" \"timestampType\": \"CREATE_TIME\",\n" +
112+
" \"key\": null,\n" +
113+
" \"value\": null,\n" +
114+
" \"headers\": []\n" +
115+
" }\n" +
116+
" ]\n" +
117+
" }\n" +
118+
"}";
119+
Type consumerRecords = createConsumerRecordsType(String.class, String.class);
120+
PowertoolsDeserializer deserializers = DESERIALIZERS.get(DeserializationType.KAFKA_JSON);
121+
deserializers.fromJson(kafkaJson, consumerRecords);
122+
}
123+
124+
private Type createConsumerRecordsType(Class<?> keyClass, Class<?> valueClass) {
125+
return new ParameterizedType() {
126+
@Override
127+
public Type[] getActualTypeArguments() {
128+
return new Type[] { keyClass, valueClass };
129+
}
130+
131+
@Override
132+
public Type getRawType() {
133+
return ConsumerRecords.class;
134+
}
135+
136+
@Override
137+
public Type getOwnerType() {
138+
return null;
139+
}
140+
};
141+
}
67142
}

0 commit comments

Comments
 (0)