Skip to content

Fixes #3705 #3706

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: bum-netty-version-to-4_2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Check Netty SNAPSHOTS
name: Check Reactor Netty 1.2.x with Netty 4.1 SNAPSHOTS

on:
schedule:
Expand All @@ -23,6 +23,8 @@ jobs:

steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
with:
ref: '1.2.x'
- name: Set up JDK 1.8
uses: actions/setup-java@3a4f6e1af504cf6a31855fa899c6aa5355ba6c12
with:
Expand Down
39 changes: 39 additions & 0 deletions .github/workflows/check_netty_4_2_snapshots.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: Check Reactor Netty 1.3.x with Netty 4.2 SNAPSHOTS

on:
schedule:
- cron: "0 14 * * *"
permissions: read-all
jobs:
build:

runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-22.04, macos-13, windows-2022]
transport: [native, nio]
exclude:
# excludes native on Windows (there's none)
- os: windows-2022
transport: native
# macOS - https://github.com/netty/netty/issues/9689
- os: macos-13
transport: native

steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- name: Set up JDK 1.8
uses: actions/setup-java@3a4f6e1af504cf6a31855fa899c6aa5355ba6c12
with:
distribution: 'temurin'
java-version: '8'
- name: Set up JDK 17
uses: actions/setup-java@3a4f6e1af504cf6a31855fa899c6aa5355ba6c12
with:
distribution: 'graalvm'
java-version: '17.0.12'
- name: Build with Gradle
run: ./gradlew clean check -x :reactor-netty-core:java17Test --no-daemon -PforceTransport=${{ matrix.transport }} -PforceNettyVersion='4.2.1.Final-SNAPSHOT'
- name: GraalVM smoke tests
run: ./gradlew :reactor-netty-graalvm-smoke-tests:nativeTest --no-daemon -PforceTransport=${{ matrix.transport }} -PforceNettyVersion='4.2.1.Final-SNAPSHOT'
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ ext {
logbackVersion = '1.2.13'

// Netty
nettyDefaultVersion = '4.1.119.Final'
nettyDefaultVersion = '4.2.0.Final'
if (!project.hasProperty("forceNettyVersion")) {
nettyVersion = nettyDefaultVersion
}
Expand Down
1 change: 1 addition & 0 deletions reactor-netty-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ dependencies {
api "io.netty:netty-handler:$nettyVersion"
api "io.netty:netty-handler-proxy:$nettyVersion"
api "io.netty:netty-resolver-dns:$nettyVersion"
api "io.netty:netty-pkitesting:$nettyVersion"
if (!"$nettyVersion".endsWithAny("SNAPSHOT")) {
if (osdetector.classifier == "osx-x86_64" || osdetector.classifier == "osx-aarch_64") {
api "io.netty:netty-resolver-dns-native-macos:$nettyVersion$os_suffix"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2025 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,11 +19,14 @@

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.IoEventLoopGroup;
import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollDomainDatagramChannel;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollIoHandle;
import io.netty.channel.epoll.EpollIoHandler;
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
Expand Down Expand Up @@ -90,15 +93,15 @@ public String getName() {

@Override
public EventLoopGroup newEventLoopGroup(int threads, ThreadFactory factory) {
return new EpollEventLoopGroup(threads, factory);
return new MultiThreadIoEventLoopGroup(threads, factory, EpollIoHandler.newFactory());
}

@Override
public boolean supportGroup(EventLoopGroup group) {
if (group instanceof ColocatedEventLoopGroup) {
group = ((ColocatedEventLoopGroup) group).get();
}
return group instanceof EpollEventLoopGroup;
return ((IoEventLoopGroup) group).isCompatible(EpollIoHandle.class);
}

static final Logger log = Loggers.getLogger(DefaultLoopEpoll.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2025 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,11 +19,14 @@

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.IoEventLoopGroup;
import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDatagramChannel;
import io.netty.channel.kqueue.KQueueDomainDatagramChannel;
import io.netty.channel.kqueue.KQueueDomainSocketChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueIoHandle;
import io.netty.channel.kqueue.KQueueIoHandler;
import io.netty.channel.kqueue.KQueueServerDomainSocketChannel;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.kqueue.KQueueSocketChannel;
Expand Down Expand Up @@ -89,15 +92,15 @@ public String getName() {

@Override
public EventLoopGroup newEventLoopGroup(int threads, ThreadFactory factory) {
return new KQueueEventLoopGroup(threads, factory);
return new MultiThreadIoEventLoopGroup(threads, factory, KQueueIoHandler.newFactory());
}

@Override
public boolean supportGroup(EventLoopGroup group) {
if (group instanceof ColocatedEventLoopGroup) {
group = ((ColocatedEventLoopGroup) group).get();
}
return group instanceof KQueueEventLoopGroup;
return ((IoEventLoopGroup) group).isCompatible(KQueueIoHandle.class);
}

static final Logger log = Loggers.getLogger(DefaultLoopKQueue.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2025 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,8 @@
import java.util.concurrent.atomic.AtomicReference;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.nio.NioIoHandler;
import io.netty.util.concurrent.FastThreadLocalThread;
import io.netty.util.concurrent.Future;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -195,8 +196,9 @@ EventLoopGroup cacheNioSelectLoops() {

EventLoopGroup eventLoopGroup = serverSelectLoops.get();
if (null == eventLoopGroup) {
EventLoopGroup newEventLoopGroup = new NioEventLoopGroup(selectCount,
threadFactory(this, "select-nio"));
EventLoopGroup newEventLoopGroup = new MultiThreadIoEventLoopGroup(selectCount,
threadFactory(this, "select-nio"),
NioIoHandler.newFactory());
if (!serverSelectLoops.compareAndSet(null, newEventLoopGroup)) {
//"FutureReturnValueIgnored" this is deliberate
newEventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
Expand All @@ -210,8 +212,9 @@ EventLoopGroup cacheNioSelectLoops() {
EventLoopGroup cacheNioServerLoops() {
EventLoopGroup eventLoopGroup = serverLoops.get();
if (null == eventLoopGroup) {
EventLoopGroup newEventLoopGroup = new NioEventLoopGroup(workerCount,
threadFactory(this, "nio"));
EventLoopGroup newEventLoopGroup = new MultiThreadIoEventLoopGroup(workerCount,
threadFactory(this, "nio"),
NioIoHandler.newFactory());
if (!serverLoops.compareAndSet(null, newEventLoopGroup)) {
//"FutureReturnValueIgnored" this is deliberate
newEventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2025 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
*/
package reactor.netty.tcp;

import io.netty.handler.ssl.SslContextBuilder;
import reactor.util.Logger;
import reactor.util.Loggers;

Expand All @@ -35,7 +36,7 @@ final class TcpClientSecure {
try {
sslProvider =
SslProvider.builder()
.sslContext(TcpSslContextSpec.forClient())
.sslContext(SslContextBuilder.forClient().build())
.build();
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2025 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.pkitesting.X509Bundle;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
Expand Down Expand Up @@ -194,6 +195,21 @@ public TcpServer runOn(LoopResources loopResources, boolean preferNative) {
* }
* </pre>
*
* If {@link X509Bundle} needs to be used, the sample below can be
* used. Note that {@link X509Bundle} should not be used in production.
* <pre>
* {@code
* X509Bundle cert = new CertificateBuilder().rsa2048()
* .subject("CN=testca, OU=dept, O=your-org")
* .setIsCertificateAuthority(true)
* .addSanDnsName("localhost")
* .buildSelfSigned();
* TcpSslContextSpec tcpSslContextSpec =
* TcpSslContextSpec.forServer(cert.toTempCertChainPem(), cert.toTempPrivateKeyPem());
* secure(sslContextSpec -> sslContextSpec.sslContext(tcpSslContextSpec));
* }
* </pre>
*
* @param sslProviderBuilder builder callback for further customization of SslContext.
* @return a new {@link TcpServer}
*/
Expand All @@ -220,6 +236,21 @@ public TcpServer secure(Consumer<? super SslProvider.SslContextSpec> sslProvider
* }
* </pre>
*
* If {@link X509Bundle} needs to be used, the sample below can be
* used. Note that {@link X509Bundle} should not be used in production.
* <pre>
* {@code
* X509Bundle cert = new CertificateBuilder().rsa2048()
* .subject("CN=testca, OU=dept, O=your-org")
* .setIsCertificateAuthority(true)
* .addSanDnsName("localhost")
* .buildSelfSigned();
* TcpSslContextSpec tcpSslContextSpec =
* TcpSslContextSpec.forServer(cert.toTempCertChainPem(), cert.toTempPrivateKeyPem());
* secure(sslContextSpec -> sslContextSpec.sslContext(tcpSslContextSpec));
* }
* </pre>
*
* @param sslProvider The provider to set when configuring SSL
*
* @return a new {@link TcpServer}
Expand Down
18 changes: 18 additions & 0 deletions reactor-netty-core/src/main/java/reactor/netty/udp/UdpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.SocketProtocolFamily;
import io.netty.handler.logging.LogLevel;
import io.netty.util.AttributeKey;
import org.jspecify.annotations.Nullable;
Expand Down Expand Up @@ -199,7 +200,9 @@ public final UdpClient runOn(LoopResources loopResources, boolean preferNative)
* @param loopResources a new loop resources
* @param family a specific {@link InternetProtocolFamily} to run with
* @return a new {@link UdpClient} reference
* @deprecated Prefer {@link #runOn(LoopResources, SocketProtocolFamily)}
*/
@Deprecated
public final UdpClient runOn(LoopResources loopResources, InternetProtocolFamily family) {
Objects.requireNonNull(loopResources, "loopResources");
Objects.requireNonNull(family, "family");
Expand All @@ -208,6 +211,21 @@ public final UdpClient runOn(LoopResources loopResources, InternetProtocolFamily
return dup;
}

/**
* Run IO loops on a supplied {@link EventLoopGroup} from the {@link LoopResources} container.
*
* @param loopResources a new loop resources
* @param socketFamily a specific {@link SocketProtocolFamily} to run with
* @return a new {@link UdpClient} reference
*/
public final UdpClient runOn(LoopResources loopResources, SocketProtocolFamily socketFamily) {
Objects.requireNonNull(loopResources, "loopResources");
Objects.requireNonNull(socketFamily, "socketFamily");
UdpClient dup = super.runOn(loopResources, false);
dup.configuration().socketFamily = socketFamily;
return dup;
}

@Override
public final UdpClient wiretap(boolean enable) {
return super.wiretap(enable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.SocketProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.unix.DomainDatagramChannel;
import io.netty.handler.logging.LogLevel;
Expand Down Expand Up @@ -62,10 +63,19 @@ public final ChannelOperations.OnSetup channelOperationsProvider() {
return family;
}

/**
* Return the configured {@link SocketProtocolFamily} to run with or null.
*
* @return the configured {@link SocketProtocolFamily} to run with or null
*/
public final @Nullable SocketProtocolFamily socketFamily() {
return socketFamily;
}

// Protected/Package private write API

@Nullable InternetProtocolFamily family;
@Nullable SocketProtocolFamily socketFamily;

UdpClientConfig(ConnectionProvider connectionProvider, Map<ChannelOption<?>, ?> options,
Supplier<? extends SocketAddress> remoteAddress) {
Expand All @@ -88,7 +98,13 @@ protected ChannelFactory<? extends Channel> connectionFactory(EventLoopGroup elg
return super.connectionFactory(elg, isDomainSocket);
}
else {
return () -> new NioDatagramChannel(family());
SocketProtocolFamily socketFamily = socketFamily();
if (socketFamily != null) {
return () -> new NioDatagramChannel(socketFamily);
}
else {
return () -> new NioDatagramChannel(family());
}
}
}

Expand Down
Loading
Loading