Skip to content

Commit 0844985

Browse files
committed
Fixes #3705
Signed-off-by: Mingyuan Wu <my.wu@outlook.com>
1 parent 2b31a77 commit 0844985

23 files changed

+2303
-57
lines changed

reactor-netty-core/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ dependencies {
6565
api "io.netty:netty-handler:$nettyVersion"
6666
api "io.netty:netty-handler-proxy:$nettyVersion"
6767
api "io.netty:netty-resolver-dns:$nettyVersion"
68+
api "io.netty:netty-pkitesting:$nettyVersion"
6869
if (!"$nettyVersion".endsWithAny("SNAPSHOT")) {
6970
if (osdetector.classifier == "osx-x86_64" || osdetector.classifier == "osx-aarch_64") {
7071
api "io.netty:netty-resolver-dns-native-macos:$nettyVersion$os_suffix"

reactor-netty-core/src/main/java/reactor/netty/resources/DefaultLoopEpoll.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2011-2022 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2011-2025 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,11 +19,14 @@
1919

2020
import io.netty.channel.Channel;
2121
import io.netty.channel.EventLoopGroup;
22+
import io.netty.channel.IoEventLoopGroup;
23+
import io.netty.channel.MultiThreadIoEventLoopGroup;
2224
import io.netty.channel.epoll.Epoll;
2325
import io.netty.channel.epoll.EpollDatagramChannel;
2426
import io.netty.channel.epoll.EpollDomainDatagramChannel;
2527
import io.netty.channel.epoll.EpollDomainSocketChannel;
26-
import io.netty.channel.epoll.EpollEventLoopGroup;
28+
import io.netty.channel.epoll.EpollIoHandle;
29+
import io.netty.channel.epoll.EpollIoHandler;
2730
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
2831
import io.netty.channel.epoll.EpollServerSocketChannel;
2932
import io.netty.channel.epoll.EpollSocketChannel;
@@ -90,15 +93,15 @@ public String getName() {
9093

9194
@Override
9295
public EventLoopGroup newEventLoopGroup(int threads, ThreadFactory factory) {
93-
return new EpollEventLoopGroup(threads, factory);
96+
return new MultiThreadIoEventLoopGroup(threads, factory, EpollIoHandler.newFactory());
9497
}
9598

9699
@Override
97100
public boolean supportGroup(EventLoopGroup group) {
98101
if (group instanceof ColocatedEventLoopGroup) {
99102
group = ((ColocatedEventLoopGroup) group).get();
100103
}
101-
return group instanceof EpollEventLoopGroup;
104+
return ((IoEventLoopGroup) group).isCompatible(EpollIoHandle.class);
102105
}
103106

104107
static final Logger log = Loggers.getLogger(DefaultLoopEpoll.class);

reactor-netty-core/src/main/java/reactor/netty/resources/DefaultLoopKQueue.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018-2022 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2018-2025 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,11 +19,14 @@
1919

2020
import io.netty.channel.Channel;
2121
import io.netty.channel.EventLoopGroup;
22+
import io.netty.channel.IoEventLoopGroup;
23+
import io.netty.channel.MultiThreadIoEventLoopGroup;
2224
import io.netty.channel.kqueue.KQueue;
2325
import io.netty.channel.kqueue.KQueueDatagramChannel;
2426
import io.netty.channel.kqueue.KQueueDomainDatagramChannel;
2527
import io.netty.channel.kqueue.KQueueDomainSocketChannel;
26-
import io.netty.channel.kqueue.KQueueEventLoopGroup;
28+
import io.netty.channel.kqueue.KQueueIoHandle;
29+
import io.netty.channel.kqueue.KQueueIoHandler;
2730
import io.netty.channel.kqueue.KQueueServerDomainSocketChannel;
2831
import io.netty.channel.kqueue.KQueueServerSocketChannel;
2932
import io.netty.channel.kqueue.KQueueSocketChannel;
@@ -89,15 +92,15 @@ public String getName() {
8992

9093
@Override
9194
public EventLoopGroup newEventLoopGroup(int threads, ThreadFactory factory) {
92-
return new KQueueEventLoopGroup(threads, factory);
95+
return new MultiThreadIoEventLoopGroup(threads, factory, KQueueIoHandler.newFactory());
9396
}
9497

9598
@Override
9699
public boolean supportGroup(EventLoopGroup group) {
97100
if (group instanceof ColocatedEventLoopGroup) {
98101
group = ((ColocatedEventLoopGroup) group).get();
99102
}
100-
return group instanceof KQueueEventLoopGroup;
103+
return ((IoEventLoopGroup) group).isCompatible(KQueueIoHandle.class);
101104
}
102105

103106
static final Logger log = Loggers.getLogger(DefaultLoopKQueue.class);

reactor-netty-core/src/main/java/reactor/netty/resources/DefaultLoopResources.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2011-2025 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,7 +23,8 @@
2323
import java.util.concurrent.atomic.AtomicReference;
2424

2525
import io.netty.channel.EventLoopGroup;
26-
import io.netty.channel.nio.NioEventLoopGroup;
26+
import io.netty.channel.MultiThreadIoEventLoopGroup;
27+
import io.netty.channel.nio.NioIoHandler;
2728
import io.netty.util.concurrent.FastThreadLocalThread;
2829
import io.netty.util.concurrent.Future;
2930
import reactor.core.publisher.Mono;
@@ -195,8 +196,9 @@ EventLoopGroup cacheNioSelectLoops() {
195196

196197
EventLoopGroup eventLoopGroup = serverSelectLoops.get();
197198
if (null == eventLoopGroup) {
198-
EventLoopGroup newEventLoopGroup = new NioEventLoopGroup(selectCount,
199-
threadFactory(this, "select-nio"));
199+
EventLoopGroup newEventLoopGroup = new MultiThreadIoEventLoopGroup(selectCount,
200+
threadFactory(this, "select-nio"),
201+
NioIoHandler.newFactory());
200202
if (!serverSelectLoops.compareAndSet(null, newEventLoopGroup)) {
201203
//"FutureReturnValueIgnored" this is deliberate
202204
newEventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
@@ -210,8 +212,9 @@ EventLoopGroup cacheNioSelectLoops() {
210212
EventLoopGroup cacheNioServerLoops() {
211213
EventLoopGroup eventLoopGroup = serverLoops.get();
212214
if (null == eventLoopGroup) {
213-
EventLoopGroup newEventLoopGroup = new NioEventLoopGroup(workerCount,
214-
threadFactory(this, "nio"));
215+
EventLoopGroup newEventLoopGroup = new MultiThreadIoEventLoopGroup(workerCount,
216+
threadFactory(this, "nio"),
217+
NioIoHandler.newFactory());
215218
if (!serverLoops.compareAndSet(null, newEventLoopGroup)) {
216219
//"FutureReturnValueIgnored" this is deliberate
217220
newEventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);

reactor-netty-core/src/main/java/reactor/netty/tcp/TcpClientSecure.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017-2021 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2017-2025 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
1515
*/
1616
package reactor.netty.tcp;
1717

18+
import io.netty.handler.ssl.SslContextBuilder;
1819
import reactor.util.Logger;
1920
import reactor.util.Loggers;
2021

@@ -35,7 +36,7 @@ final class TcpClientSecure {
3536
try {
3637
sslProvider =
3738
SslProvider.builder()
38-
.sslContext(TcpSslContextSpec.forClient())
39+
.sslContext(SslContextBuilder.forClient().build())
3940
.build();
4041
}
4142
catch (Exception e) {

reactor-netty-core/src/main/java/reactor/netty/tcp/TcpServer.java

+32-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2011-2025 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
import io.netty.handler.ssl.OpenSsl;
2929
import io.netty.handler.ssl.SslContext;
3030
import io.netty.handler.ssl.util.SelfSignedCertificate;
31+
import io.netty.pkitesting.X509Bundle;
3132
import org.reactivestreams.Publisher;
3233
import reactor.core.publisher.Mono;
3334
import reactor.netty.Connection;
@@ -194,6 +195,21 @@ public TcpServer runOn(LoopResources loopResources, boolean preferNative) {
194195
* }
195196
* </pre>
196197
*
198+
* If {@link X509Bundle} needs to be used, the sample below can be
199+
* used. Note that {@link X509Bundle} should not be used in production.
200+
* <pre>
201+
* {@code
202+
* X509Bundle cert = new CertificateBuilder().rsa2048()
203+
* .subject("CN=testca, OU=dept, O=your-org")
204+
* .setIsCertificateAuthority(true)
205+
* .addSanDnsName("localhost")
206+
* .buildSelfSigned();
207+
* TcpSslContextSpec tcpSslContextSpec =
208+
* TcpSslContextSpec.forServer(cert.toTempCertChainPem(), cert.toTempPrivateKeyPem());
209+
* secure(sslContextSpec -> sslContextSpec.sslContext(tcpSslContextSpec));
210+
* }
211+
* </pre>
212+
*
197213
* @param sslProviderBuilder builder callback for further customization of SslContext.
198214
* @return a new {@link TcpServer}
199215
*/
@@ -220,6 +236,21 @@ public TcpServer secure(Consumer<? super SslProvider.SslContextSpec> sslProvider
220236
* }
221237
* </pre>
222238
*
239+
* If {@link X509Bundle} needs to be used, the sample below can be
240+
* used. Note that {@link X509Bundle} should not be used in production.
241+
* <pre>
242+
* {@code
243+
* X509Bundle cert = new CertificateBuilder().rsa2048()
244+
* .subject("CN=testca, OU=dept, O=your-org")
245+
* .setIsCertificateAuthority(true)
246+
* .addSanDnsName("localhost")
247+
* .buildSelfSigned();
248+
* TcpSslContextSpec tcpSslContextSpec =
249+
* TcpSslContextSpec.forServer(cert.toTempCertChainPem(), cert.toTempPrivateKeyPem());
250+
* secure(sslContextSpec -> sslContextSpec.sslContext(tcpSslContextSpec));
251+
* }
252+
* </pre>
253+
*
223254
* @param sslProvider The provider to set when configuring SSL
224255
*
225256
* @return a new {@link TcpServer}

reactor-netty-core/src/main/java/reactor/netty/udp/UdpClient.java

+18
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.netty.channel.ChannelOption;
2626
import io.netty.channel.EventLoopGroup;
2727
import io.netty.channel.socket.InternetProtocolFamily;
28+
import io.netty.channel.socket.SocketProtocolFamily;
2829
import io.netty.handler.logging.LogLevel;
2930
import io.netty.util.AttributeKey;
3031
import org.jspecify.annotations.Nullable;
@@ -199,7 +200,9 @@ public final UdpClient runOn(LoopResources loopResources, boolean preferNative)
199200
* @param loopResources a new loop resources
200201
* @param family a specific {@link InternetProtocolFamily} to run with
201202
* @return a new {@link UdpClient} reference
203+
* @deprecated Prefer {@link #runOn(LoopResources, SocketProtocolFamily)}
202204
*/
205+
@Deprecated
203206
public final UdpClient runOn(LoopResources loopResources, InternetProtocolFamily family) {
204207
Objects.requireNonNull(loopResources, "loopResources");
205208
Objects.requireNonNull(family, "family");
@@ -208,6 +211,21 @@ public final UdpClient runOn(LoopResources loopResources, InternetProtocolFamily
208211
return dup;
209212
}
210213

214+
/**
215+
* Run IO loops on a supplied {@link EventLoopGroup} from the {@link LoopResources} container.
216+
*
217+
* @param loopResources a new loop resources
218+
* @param socketFamily a specific {@link SocketProtocolFamily} to run with
219+
* @return a new {@link UdpClient} reference
220+
*/
221+
public final UdpClient runOn(LoopResources loopResources, SocketProtocolFamily socketFamily) {
222+
Objects.requireNonNull(loopResources, "loopResources");
223+
Objects.requireNonNull(socketFamily, "socketFamily");
224+
UdpClient dup = super.runOn(loopResources, false);
225+
dup.configuration().socketFamily = socketFamily;
226+
return dup;
227+
}
228+
211229
@Override
212230
public final UdpClient wiretap(boolean enable) {
213231
return super.wiretap(enable);

reactor-netty-core/src/main/java/reactor/netty/udp/UdpClientConfig.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.channel.EventLoopGroup;
2222
import io.netty.channel.socket.DatagramChannel;
2323
import io.netty.channel.socket.InternetProtocolFamily;
24+
import io.netty.channel.socket.SocketProtocolFamily;
2425
import io.netty.channel.socket.nio.NioDatagramChannel;
2526
import io.netty.channel.unix.DomainDatagramChannel;
2627
import io.netty.handler.logging.LogLevel;
@@ -62,10 +63,19 @@ public final ChannelOperations.OnSetup channelOperationsProvider() {
6263
return family;
6364
}
6465

66+
/**
67+
* Return the configured {@link SocketProtocolFamily} to run with or null.
68+
*
69+
* @return the configured {@link SocketProtocolFamily} to run with or null
70+
*/
71+
public final @Nullable SocketProtocolFamily socketFamily() {
72+
return socketFamily;
73+
}
6574

6675
// Protected/Package private write API
6776

6877
@Nullable InternetProtocolFamily family;
78+
@Nullable SocketProtocolFamily socketFamily;
6979

7080
UdpClientConfig(ConnectionProvider connectionProvider, Map<ChannelOption<?>, ?> options,
7181
Supplier<? extends SocketAddress> remoteAddress) {
@@ -88,7 +98,13 @@ protected ChannelFactory<? extends Channel> connectionFactory(EventLoopGroup elg
8898
return super.connectionFactory(elg, isDomainSocket);
8999
}
90100
else {
91-
return () -> new NioDatagramChannel(family());
101+
SocketProtocolFamily socketFamily = socketFamily();
102+
if (socketFamily != null) {
103+
return () -> new NioDatagramChannel(socketFamily);
104+
}
105+
else {
106+
return () -> new NioDatagramChannel(family());
107+
}
92108
}
93109
}
94110

reactor-netty-core/src/main/java/reactor/netty/udp/UdpServer.java

+19
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.netty.channel.ChannelOption;
2626
import io.netty.channel.EventLoopGroup;
2727
import io.netty.channel.socket.InternetProtocolFamily;
28+
import io.netty.channel.socket.SocketProtocolFamily;
2829
import io.netty.handler.logging.LogLevel;
2930
import io.netty.util.AttributeKey;
3031
import org.jspecify.annotations.Nullable;
@@ -250,6 +251,7 @@ public final UdpServer runOn(LoopResources loopResources, boolean preferNative)
250251
Objects.requireNonNull(loopResources, "loopResources");
251252
UdpServer dup = super.runOn(loopResources, preferNative);
252253
dup.configuration().family = null;
254+
dup.configuration().socketFamily = null;
253255
return dup;
254256
}
255257

@@ -259,7 +261,9 @@ public final UdpServer runOn(LoopResources loopResources, boolean preferNative)
259261
* @param loopResources a new loop resources
260262
* @param family a specific {@link InternetProtocolFamily} to run with
261263
* @return a new {@link UdpServer} reference
264+
* @deprecated Prefer {@link #runOn(LoopResources, SocketProtocolFamily)}
262265
*/
266+
@Deprecated
263267
public final UdpServer runOn(LoopResources loopResources, InternetProtocolFamily family) {
264268
Objects.requireNonNull(loopResources, "loopResources");
265269
Objects.requireNonNull(family, "family");
@@ -268,6 +272,21 @@ public final UdpServer runOn(LoopResources loopResources, InternetProtocolFamily
268272
return dup;
269273
}
270274

275+
/**
276+
* Run IO loops on a supplied {@link EventLoopGroup} from the {@link LoopResources} container.
277+
*
278+
* @param loopResources a new loop resources
279+
* @param socketFamily a specific {@link SocketProtocolFamily} to run with
280+
* @return a new {@link UdpServer} reference
281+
*/
282+
public final UdpServer runOn(LoopResources loopResources, SocketProtocolFamily socketFamily) {
283+
Objects.requireNonNull(loopResources, "loopResources");
284+
Objects.requireNonNull(socketFamily, "socketFamily");
285+
UdpServer dup = super.runOn(loopResources, false);
286+
dup.configuration().socketFamily = socketFamily;
287+
return dup;
288+
}
289+
271290
/**
272291
* Based on the actual configuration, returns a {@link Mono} that triggers:
273292
* <ul>

reactor-netty-core/src/main/java/reactor/netty/udp/UdpServerConfig.java

+10
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.netty.channel.group.ChannelGroup;
2323
import io.netty.channel.socket.DatagramChannel;
2424
import io.netty.channel.socket.InternetProtocolFamily;
25+
import io.netty.channel.socket.SocketProtocolFamily;
2526
import io.netty.channel.socket.nio.NioDatagramChannel;
2627
import io.netty.channel.unix.DomainDatagramChannel;
2728
import io.netty.handler.logging.LogLevel;
@@ -92,13 +93,22 @@ public ChannelOperations.OnSetup channelOperationsProvider() {
9293
return family;
9394
}
9495

96+
/**
97+
* Return the configured {@link SocketProtocolFamily} to run with or null.
98+
*
99+
* @return the configured {@link SocketProtocolFamily} to run with or null
100+
*/
101+
public final @Nullable SocketProtocolFamily socketFamily() {
102+
return socketFamily;
103+
}
95104

96105
// Protected/Package private write API
97106

98107
@Nullable Consumer<? super UdpServerConfig> doOnBind;
99108
@Nullable Consumer<? super Connection> doOnBound;
100109
@Nullable Consumer<? super Connection> doOnUnbound;
101110
@Nullable InternetProtocolFamily family;
111+
@Nullable SocketProtocolFamily socketFamily;
102112

103113
UdpServerConfig(Map<ChannelOption<?>, ?> options, Supplier<? extends SocketAddress> bindAddress) {
104114
super(options, bindAddress);

0 commit comments

Comments
 (0)