@@ -48,53 +48,46 @@ class TCPServiceFrameParser(val vertx: Vertx, val socket: NetSocket) : Handler<A
4848 val frame = event.result()
4949 log.trace(" Received frame: {}" , frame)
5050
51- // todo: revisit this || after fixing below todo
52- if (" message" == frame.getString(" type" ) || " send" == frame.getString(" type" )) {
53- if (frame.getString(" replyAddress" ) != null ) {
54- val deliveryOptions = DeliveryOptions ()
55- frame.getJsonObject(" headers" ).fieldNames().forEach {
56- deliveryOptions.addHeader(it, frame.getJsonObject(" headers" ).getString(it))
57- }
58- vertx.eventBus().request<Any >(
59- frame.getString(" address" ), frame.getJsonObject(" body" ), deliveryOptions
60- ).onComplete {
61- if (it.succeeded()) {
62- FrameHelper .sendFrame(
63- BridgeEventType .SEND .name.lowercase(),
64- frame.getString(" replyAddress" ),
65- it.result().body(),
66- socket
67- )
68- } else {
69- val replyException = it.cause() as ReplyException
70- FrameHelper .writeFrame(
71- JsonObject ()
72- .put(" type" , BridgeEventType .SEND .name.lowercase())
73- .put(" address" , frame.getString(" replyAddress" ))
74- .put(" failureCode" , replyException.failureCode())
75- .put(" failureType" , replyException.failureType().name)
76- .put(" message" , replyException.message),
77- socket
78- )
79- }
80- }
81- } else {
82- val body = frame.getValue(" body" )
83- if (body is JsonObject ) {
84- if (body.getString(" message" )?.startsWith(" EventBusException:" ) == true ) {
85- handleErrorFrame(body.put(" address" , frame.getString(" address" )))
86- } else {
87- vertx.eventBus().send(frame.getString(" address" ), body)
88- }
51+ if (frame.getString(" replyAddress" ) != null ) {
52+ val deliveryOptions = DeliveryOptions ()
53+ frame.getJsonObject(" headers" ).fieldNames().forEach {
54+ deliveryOptions.addHeader(it, frame.getJsonObject(" headers" ).getString(it))
55+ }
56+ vertx.eventBus().request<Any >(
57+ frame.getString(" address" ), frame.getJsonObject(" body" ), deliveryOptions
58+ ).onComplete {
59+ if (it.succeeded()) {
60+ FrameHelper .sendFrame(
61+ BridgeEventType .SEND .name.lowercase(),
62+ frame.getString(" replyAddress" ),
63+ it.result().body(),
64+ socket
65+ )
8966 } else {
90- vertx.eventBus().send(frame.getString(" address" ), body)
67+ val replyException = it.cause() as ReplyException
68+ FrameHelper .writeFrame(
69+ JsonObject ()
70+ .put(" type" , BridgeEventType .SEND .name.lowercase())
71+ .put(" address" , frame.getString(" replyAddress" ))
72+ .put(" failureCode" , replyException.failureCode())
73+ .put(" failureType" , replyException.failureType().name)
74+ .put(" message" , replyException.message),
75+ socket
76+ )
9177 }
9278 }
93- } else if (" err" == frame.getString(" type" )) {
79+ } else if (frame.getString(" address" ) != null ) {
80+ val body = frame.getValue(" body" )
81+ if (body == null && frame.getString(" message" )?.startsWith(" EventBusException:" ) == true ) {
82+ handleErrorFrame(frame)
83+ } else if (body is JsonObject && body.getString(" message" )?.startsWith(" EventBusException:" ) == true ) {
84+ handleErrorFrame(body.put(" address" , frame.getString(" address" )))
85+ } else {
86+ vertx.eventBus().send(frame.getString(" address" ), body)
87+ }
88+ } else {
9489 // directly thrown event bus exceptions
9590 throw ReplyException (ReplyFailure .ERROR , frame.getString(" message" ))
96- } else {
97- throw UnsupportedOperationException (frame.toString())
9891 }
9992 }
10093
0 commit comments