|
16 | 16 | use Interop\Queue\Queue; |
17 | 17 | use Interop\Queue\SubscriptionConsumer; |
18 | 18 | use Interop\Queue\Topic; |
19 | | -use InvalidArgumentException; |
20 | 19 | use RdKafka\Conf; |
21 | 20 | use RdKafka\KafkaConsumer; |
22 | 21 | use RdKafka\Producer as VendorProducer; |
@@ -58,34 +57,6 @@ public function __construct(array $config) |
58 | 57 | $this->configureSerializer($config); |
59 | 58 | } |
60 | 59 |
|
61 | | - /** |
62 | | - * @param array $config |
63 | | - * @return void |
64 | | - */ |
65 | | - private function configureSerializer(array $config): void |
66 | | - { |
67 | | - if (!isset($config['serializer'])) { |
68 | | - $this->setSerializer(new JsonSerializer()); |
69 | | - return; |
70 | | - } |
71 | | - |
72 | | - if (is_string($config['serializer'])) { |
73 | | - $this->setSerializer(new $config['serializer']()); |
74 | | - } elseif (is_array($config['serializer']) && isset($config['serializer']['class'])) { |
75 | | - $serializerClass = $config['serializer']['class']; |
76 | | - $serializerOptions = $config['serializer']['options'] ?? []; |
77 | | - if (!empty($serializerOptions)) { |
78 | | - $this->setSerializer(new $serializerClass($serializerOptions)); |
79 | | - } else { |
80 | | - $this->setSerializer(new $serializerClass()); |
81 | | - } |
82 | | - } elseif ($config['serializer'] instanceof Serializer) { |
83 | | - $this->setSerializer($config['serializer']); |
84 | | - } else { |
85 | | - throw new InvalidArgumentException('Invalid serializer configuration'); |
86 | | - } |
87 | | - } |
88 | | - |
89 | 60 | /** |
90 | 61 | * @return RdKafkaMessage |
91 | 62 | */ |
@@ -208,6 +179,58 @@ public static function getLibrdKafkaVersion(): string |
208 | 179 | return "$major.$minor.$patch"; |
209 | 180 | } |
210 | 181 |
|
| 182 | + /** |
| 183 | + * @return void |
| 184 | + * JsonSerializer should be the default fallback if no serializer is specified |
| 185 | + */ |
| 186 | + private function configureSerializer(array $config): void |
| 187 | + { |
| 188 | + if (!isset($config['serializer'])) { |
| 189 | + $this->setSerializer(new JsonSerializer()); |
| 190 | + |
| 191 | + return; |
| 192 | + } |
| 193 | + |
| 194 | + $serializer = $config['serializer']; |
| 195 | + |
| 196 | + if ($serializer instanceof Serializer) { |
| 197 | + $this->setSerializer($serializer); |
| 198 | + |
| 199 | + return; |
| 200 | + } |
| 201 | + |
| 202 | + $serializerClass = $this->resolveSerializerClass($serializer); |
| 203 | + |
| 204 | + if (!class_exists($serializerClass) || !is_a($serializerClass, Serializer::class, true)) { |
| 205 | + throw $this->createInvalidSerializerException($serializerClass); |
| 206 | + } |
| 207 | + |
| 208 | + $serializerOptions = $serializer['options'] ?? []; |
| 209 | + $this->setSerializer(new $serializerClass($serializerOptions)); |
| 210 | + } |
| 211 | + |
| 212 | + private function resolveSerializerClass(mixed $serializer): string |
| 213 | + { |
| 214 | + if (is_string($serializer)) { |
| 215 | + return $serializer; |
| 216 | + } |
| 217 | + |
| 218 | + if (is_array($serializer) && isset($serializer['class'])) { |
| 219 | + return $serializer['class']; |
| 220 | + } |
| 221 | + |
| 222 | + throw $this->createInvalidSerializerException($serializer); |
| 223 | + } |
| 224 | + |
| 225 | + private function createInvalidSerializerException(mixed $value): \InvalidArgumentException |
| 226 | + { |
| 227 | + return new \InvalidArgumentException(sprintf( |
| 228 | + 'Invalid serializer configuration. Expected "serializer" to be a string, an array with a "class" key, or a %s instance. Received %s instead.', |
| 229 | + Serializer::class, |
| 230 | + get_debug_type($value) |
| 231 | + )); |
| 232 | + } |
| 233 | + |
211 | 234 | private function getConf(): Conf |
212 | 235 | { |
213 | 236 | if (null === $this->conf) { |
|
0 commit comments