Skip to content

Commit 247165d

Browse files
authored
Feature/routing (#55)
1 parent fa269a4 commit 247165d

File tree

46 files changed

+1140
-502
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1140
-502
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ group=io.rsocket.rpc
22
version=0.3.0
33

44
reactorBomVersion=Dysprosium-RELEASE
5-
rsocketVersion=1.0.0-RC6-SNAPSHOT
5+
rsocketVersion=1.0.0-RC6-bugfix-prioritization-SNAPSHOT
66
graphqlVersion=11.0
77
protobufVersion=3.7.1
88
log4jVersion=2.12.1

rsocket-ipc-core/src/main/java/io/rsocket/ipc/IPCServerRSocket.java

Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -17,61 +17,31 @@
1717

1818
import io.opentracing.Tracer;
1919
import io.rsocket.Payload;
20-
import io.rsocket.ipc.util.IPCChannelFunction;
21-
import io.rsocket.ipc.util.IPCFunction;
22-
import java.util.Map;
20+
import io.rsocket.ipc.routing.SimpleRouter;
2321
import reactor.core.publisher.Flux;
24-
import reactor.core.publisher.Mono;
2522

2623
@SuppressWarnings("unchecked")
2724
class IPCServerRSocket extends RoutingServerRSocket implements IPCRSocket, SelfRegistrable {
2825

2926
private final String service;
27+
private final SimpleRouter simpleRouter;
3028

31-
public IPCServerRSocket(
32-
String service,
33-
Map<String, IPCFunction<Mono<Void>>> fireAndForgetRegistry,
34-
Map<String, IPCFunction<Mono<Payload>>> requestResponseRegistry,
35-
Map<String, IPCFunction<Flux<Payload>>> requestStreamRegistry,
36-
Map<String, IPCChannelFunction> requestChannelRegistry) {
37-
super(
38-
fireAndForgetRegistry,
39-
requestResponseRegistry,
40-
requestStreamRegistry,
41-
requestChannelRegistry);
29+
public IPCServerRSocket(String service, SimpleRouter simpleRouter) {
30+
super(simpleRouter);
4231
this.service = service;
32+
this.simpleRouter = simpleRouter;
4333
}
4434

45-
public IPCServerRSocket(
46-
String service,
47-
Tracer tracer,
48-
Map<String, IPCFunction<Mono<Void>>> fireAndForgetRegistry,
49-
Map<String, IPCFunction<Mono<Payload>>> requestResponseRegistry,
50-
Map<String, IPCFunction<Flux<Payload>>> requestStreamRegistry,
51-
Map<String, IPCChannelFunction> requestChannelRegistry) {
52-
super(
53-
tracer,
54-
fireAndForgetRegistry,
55-
requestResponseRegistry,
56-
requestStreamRegistry,
57-
requestChannelRegistry);
35+
public IPCServerRSocket(String service, Tracer tracer, SimpleRouter simpleRouter) {
36+
super(tracer, simpleRouter);
5837
this.service = service;
38+
this.simpleRouter = simpleRouter;
5939
}
6040

61-
public IPCServerRSocket(
62-
String service,
63-
MetadataDecoder decoder,
64-
Map<String, IPCFunction<Mono<Void>>> fireAndForgetRegistry,
65-
Map<String, IPCFunction<Mono<Payload>>> requestResponseRegistry,
66-
Map<String, IPCFunction<Flux<Payload>>> requestStreamRegistry,
67-
Map<String, IPCChannelFunction> requestChannelRegistry) {
68-
super(
69-
decoder,
70-
fireAndForgetRegistry,
71-
requestResponseRegistry,
72-
requestStreamRegistry,
73-
requestChannelRegistry);
41+
public IPCServerRSocket(String service, MetadataDecoder decoder, SimpleRouter simpleRouter) {
42+
super(decoder, simpleRouter);
7443
this.service = service;
44+
this.simpleRouter = simpleRouter;
7545
}
7646

7747
@Override
@@ -80,15 +50,11 @@ public String getService() {
8050
}
8151

8252
@Override
83-
public void selfRegister(
84-
Map<String, IPCFunction<Mono<Void>>> fireAndForgetRegistry,
85-
Map<String, IPCFunction<Mono<Payload>>> requestResponseRegistry,
86-
Map<String, IPCFunction<Flux<Payload>>> requestStreamRegistry,
87-
Map<String, IPCChannelFunction> requestChannelRegistry) {
88-
fireAndForgetRegistry.putAll(this.fireAndForgetRegistry);
89-
requestResponseRegistry.putAll(this.requestResponseRegistry);
90-
requestStreamRegistry.putAll(this.requestStreamRegistry);
91-
requestChannelRegistry.putAll(this.requestChannelRegistry);
53+
public void selfRegister(MutableRouter mutableRouter) {
54+
simpleRouter.getFireAndForgetRegistry().forEach(mutableRouter::withFireAndForgetRoute);
55+
simpleRouter.getRequestResponseRegistry().forEach(mutableRouter::withRequestResponseRoute);
56+
simpleRouter.getRequestStreamRegistry().forEach(mutableRouter::withRequestStreamRoute);
57+
simpleRouter.getRequestChannelRegistry().forEach(mutableRouter::withRequestChannelRoute);
9258
}
9359

9460
@Override
Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,35 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.rsocket.ipc;
217

318
import io.netty.buffer.ByteBuf;
419
import io.opentracing.SpanContext;
5-
import io.rsocket.Payload;
620

721
@FunctionalInterface
822
public interface MetadataDecoder {
923

10-
<RESULT> RESULT decode(Payload payload, Handler<RESULT> transformer) throws Exception;
24+
Metadata decode(ByteBuf metadataByteBuf) throws Exception;
1125

12-
interface Handler<RESULT> {
13-
RESULT handleAndReply(ByteBuf data, ByteBuf metadata, String route, SpanContext spanContext)
14-
throws Exception;
26+
interface Metadata {
27+
ByteBuf metadata();
28+
29+
String route();
30+
31+
SpanContext spanContext();
32+
33+
boolean isComposite();
1534
}
1635
}

rsocket-ipc-core/src/main/java/io/rsocket/ipc/MetadataEncoder.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.rsocket.ipc;
217

318
import io.netty.buffer.ByteBuf;
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.ipc;
17+
18+
import io.rsocket.Payload;
19+
import io.rsocket.ipc.util.IPCChannelFunction;
20+
import io.rsocket.ipc.util.IPCFunction;
21+
import reactor.core.publisher.Flux;
22+
import reactor.core.publisher.Mono;
23+
24+
public interface MutableRouter<SELF extends MutableRouter<SELF>> extends Router {
25+
SELF withFireAndForgetRoute(String route, IPCFunction<Mono<Void>> function);
26+
27+
SELF withRequestResponseRoute(String route, IPCFunction<Mono<Payload>> function);
28+
29+
SELF withRequestStreamRoute(String route, IPCFunction<Flux<Payload>> function);
30+
31+
SELF withRequestChannelRoute(String route, IPCChannelFunction function);
32+
}
Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,40 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.rsocket.ipc;
217

318
import io.opentracing.Tracer;
419
import io.rsocket.ResponderRSocket;
5-
import java.util.concurrent.ConcurrentHashMap;
20+
import io.rsocket.ipc.routing.SimpleRouter;
621

722
public class RequestHandlingRSocket extends RoutingServerRSocket implements ResponderRSocket {
823

924
public RequestHandlingRSocket() {
10-
super(
11-
new ConcurrentHashMap<>(),
12-
new ConcurrentHashMap<>(),
13-
new ConcurrentHashMap<>(),
14-
new ConcurrentHashMap<>());
25+
super(new SimpleRouter());
1526
}
1627

1728
public RequestHandlingRSocket(Tracer tracer) {
18-
super(
19-
tracer,
20-
new ConcurrentHashMap<>(),
21-
new ConcurrentHashMap<>(),
22-
new ConcurrentHashMap<>(),
23-
new ConcurrentHashMap<>());
29+
super(tracer, new SimpleRouter());
2430
}
2531

2632
public RequestHandlingRSocket(MetadataDecoder decoder) {
27-
super(
28-
decoder,
29-
new ConcurrentHashMap<>(),
30-
new ConcurrentHashMap<>(),
31-
new ConcurrentHashMap<>(),
32-
new ConcurrentHashMap<>());
33+
super(decoder, new SimpleRouter());
3334
}
3435

3536
public RequestHandlingRSocket withEndpoint(SelfRegistrable selfRegistrable) {
36-
selfRegistrable.selfRegister(
37-
fireAndForgetRegistry,
38-
requestResponseRegistry,
39-
requestStreamRegistry,
40-
requestChannelRegistry);
37+
selfRegistrable.selfRegister((MutableRouter) router);
4138
return this;
4239
}
4340
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.ipc;
17+
18+
import io.rsocket.Payload;
19+
import io.rsocket.ipc.util.IPCChannelFunction;
20+
import io.rsocket.ipc.util.IPCFunction;
21+
import reactor.core.publisher.Flux;
22+
import reactor.core.publisher.Mono;
23+
24+
public interface Router {
25+
26+
IPCFunction<Mono<Void>> routeFireAndForget(String route);
27+
28+
IPCFunction<Mono<Payload>> routeRequestResponse(String route);
29+
30+
IPCFunction<Flux<Payload>> routeRequestStream(String route);
31+
32+
IPCChannelFunction routeRequestChannel(String route);
33+
}

0 commit comments

Comments
 (0)