Skip to content

Add MacOS Workflow and IO_URING Transport #1834

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ on:
default: 'GitHub Actions'

jobs:
Build:
RunOnLinux:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand All @@ -25,3 +25,16 @@ jobs:
java-version: '11'
- name: Run Tests
run: ./mvnw -B -ntp clean test

RunOnMacOs:
runs-on: macos-latest
steps:
- uses: actions/checkout@v3
- name: Grant Permission
run: sudo chmod +x ./mvnw
- uses: actions/setup-java@v3
with:
distribution: 'corretto'
java-version: '11'
- name: Run Tests
run: ./mvnw -B -ntp clean test
8 changes: 8 additions & 0 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,13 @@
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/io.github.artsok/rerunner-jupiter -->
<dependency>
<groupId>io.github.artsok</groupId>
<artifactId>rerunner-jupiter</artifactId>
<version>2.1.6</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ public interface AsyncHttpClientConfig {

boolean isUseNativeTransport();

boolean isUseOnlyEpollNativeTransport();

Consumer<Channel> getHttpAdditionalChannelInitializer();

Consumer<Channel> getWsAdditionalChannelInitializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public DefaultAsyncHttpClient(AsyncHttpClientConfig config) {
}
}

// Visible for testing
ChannelManager channelManager() {
return channelManager;
}

private static Timer newNettyTimer(AsyncHttpClientConfig config) {
ThreadFactory threadFactory = config.getThreadFactory() != null ? config.getThreadFactory() : new DefaultThreadFactory(config.getThreadPoolName() + "-timer");
HashedWheelTimer timer = new HashedWheelTimer(threadFactory, config.getHashedWheelTimerTickDuration(), TimeUnit.MILLISECONDS, config.getHashedWheelTimerSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultUseInsecureTrustManager;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultUseLaxCookieEncoder;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultUseNativeTransport;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultUseOnlyEpollNativeTransport;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultUseOpenSsl;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultUseProxyProperties;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultUseProxySelector;
Expand Down Expand Up @@ -179,6 +180,7 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig {
private final Map<ChannelOption<Object>, Object> channelOptions;
private final EventLoopGroup eventLoopGroup;
private final boolean useNativeTransport;
private final boolean useOnlyEpollNativeTransport;
private final ByteBufAllocator allocator;
private final boolean tcpNoDelay;
private final boolean soReuseAddress;
Expand Down Expand Up @@ -273,6 +275,7 @@ private DefaultAsyncHttpClientConfig(// http
Map<ChannelOption<Object>, Object> channelOptions,
EventLoopGroup eventLoopGroup,
boolean useNativeTransport,
boolean useOnlyEpollNativeTransport,
ByteBufAllocator allocator,
Timer nettyTimer,
ThreadFactory threadFactory,
Expand Down Expand Up @@ -363,6 +366,12 @@ private DefaultAsyncHttpClientConfig(// http
this.channelOptions = channelOptions;
this.eventLoopGroup = eventLoopGroup;
this.useNativeTransport = useNativeTransport;
this.useOnlyEpollNativeTransport = useOnlyEpollNativeTransport;

if (useOnlyEpollNativeTransport && !useNativeTransport) {
throw new IllegalArgumentException("Native Transport must be enabled to use Epoll Native Transport only");
}

this.allocator = allocator;
this.nettyTimer = nettyTimer;
this.threadFactory = threadFactory;
Expand Down Expand Up @@ -703,6 +712,11 @@ public boolean isUseNativeTransport() {
return useNativeTransport;
}

@Override
public boolean isUseOnlyEpollNativeTransport() {
return useOnlyEpollNativeTransport;
}

@Override
public ByteBufAllocator getAllocator() {
return allocator;
Expand Down Expand Up @@ -832,6 +846,7 @@ public static class Builder {
private int httpClientCodecInitialBufferSize = defaultHttpClientCodecInitialBufferSize();
private int chunkedFileChunkSize = defaultChunkedFileChunkSize();
private boolean useNativeTransport = defaultUseNativeTransport();
private boolean useOnlyEpollNativeTransport = defaultUseOnlyEpollNativeTransport();
private ByteBufAllocator allocator;
private final Map<ChannelOption<Object>, Object> channelOptions = new HashMap<>();
private EventLoopGroup eventLoopGroup;
Expand Down Expand Up @@ -918,6 +933,8 @@ public Builder(AsyncHttpClientConfig config) {
channelOptions.putAll(config.getChannelOptions());
eventLoopGroup = config.getEventLoopGroup();
useNativeTransport = config.isUseNativeTransport();
useOnlyEpollNativeTransport = config.isUseOnlyEpollNativeTransport();

allocator = config.getAllocator();
nettyTimer = config.getNettyTimer();
threadFactory = config.getThreadFactory();
Expand Down Expand Up @@ -1309,6 +1326,11 @@ public Builder setUseNativeTransport(boolean useNativeTransport) {
return this;
}

public Builder setUseOnlyEpollNativeTransport(boolean useOnlyEpollNativeTransport) {
this.useOnlyEpollNativeTransport = useOnlyEpollNativeTransport;
return this;
}

public Builder setAllocator(ByteBufAllocator allocator) {
this.allocator = allocator;
return this;
Expand Down Expand Up @@ -1426,6 +1448,7 @@ public DefaultAsyncHttpClientConfig build() {
channelOptions.isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(channelOptions),
eventLoopGroup,
useNativeTransport,
useOnlyEpollNativeTransport,
allocator,
nettyTimer,
threadFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public final class AsyncHttpClientConfigDefaults {
public static final String SHUTDOWN_QUIET_PERIOD_CONFIG = "shutdownQuietPeriod";
public static final String SHUTDOWN_TIMEOUT_CONFIG = "shutdownTimeout";
public static final String USE_NATIVE_TRANSPORT_CONFIG = "useNativeTransport";
public static final String USE_ONLY_EPOLL_NATIVE_TRANSPORT = "useOnlyEpollNativeTransport";
public static final String IO_THREADS_COUNT_CONFIG = "ioThreadsCount";
public static final String HASHED_WHEEL_TIMER_TICK_DURATION = "hashedWheelTimerTickDuration";
public static final String HASHED_WHEEL_TIMER_SIZE = "hashedWheelTimerSize";
Expand Down Expand Up @@ -294,6 +295,10 @@ public static boolean defaultUseNativeTransport() {
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getBoolean(ASYNC_CLIENT_CONFIG_ROOT + USE_NATIVE_TRANSPORT_CONFIG);
}

public static boolean defaultUseOnlyEpollNativeTransport() {
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getBoolean(ASYNC_CLIENT_CONFIG_ROOT + USE_ONLY_EPOLL_NATIVE_TRANSPORT);
}

public static int defaultIoThreadsCount() {
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + IO_THREADS_COUNT_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.resolver.NameResolver;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand Down Expand Up @@ -140,12 +141,11 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {

if (allowReleaseEventLoopGroup) {
if (config.isUseNativeTransport()) {
transportFactory = getNativeTransportFactory();
transportFactory = getNativeTransportFactory(config);
} else {
transportFactory = NioTransportFactory.INSTANCE;
}
eventLoopGroup = transportFactory.newEventLoopGroup(config.getIoThreadsCount(), threadFactory);

} else {
eventLoopGroup = config.getEventLoopGroup();

Expand All @@ -155,6 +155,8 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
transportFactory = new EpollTransportFactory();
} else if (eventLoopGroup instanceof KQueueEventLoopGroup) {
transportFactory = new KQueueTransportFactory();
} else if (eventLoopGroup instanceof IOUringEventLoopGroup) {
transportFactory = new IoUringIncubatorTransportFactory();
} else {
throw new IllegalArgumentException("Unknown event loop group " + eventLoopGroup.getClass().getSimpleName());
}
Expand All @@ -167,6 +169,30 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
httpBootstrap.option(ChannelOption.AUTO_READ, false);
}

private static TransportFactory<? extends Channel, ? extends EventLoopGroup> getNativeTransportFactory(AsyncHttpClientConfig config) {
// If we are running on macOS then use KQueue
if (PlatformDependent.isOsx()) {
if (KQueueTransportFactory.isAvailable()) {
return new KQueueTransportFactory();
}
}

// If we're not running on Windows then we're probably running on Linux.
// We will check if Io_Uring is available or not. If available, return IoUringIncubatorTransportFactory.
// Else
// We will check if Epoll is available or not. If available, return EpollTransportFactory.
// If none of the condition matches then no native transport is available, and we will throw an exception.
if (!PlatformDependent.isWindows()) {
if (IoUringIncubatorTransportFactory.isAvailable() && !config.isUseOnlyEpollNativeTransport()) {
return new IoUringIncubatorTransportFactory();
} else if (EpollTransportFactory.isAvailable()) {
return new EpollTransportFactory();
}
}

throw new IllegalArgumentException("No suitable native transport (Epoll, Io_Uring or KQueue) available");
}

public static boolean isSslHandlerConfigured(ChannelPipeline pipeline) {
return pipeline.get(SSL_HANDLER) != null;
}
Expand Down Expand Up @@ -202,24 +228,6 @@ private static Bootstrap newBootstrap(ChannelFactory<? extends Channel> channelF
return bootstrap;
}

private static TransportFactory<? extends Channel, ? extends EventLoopGroup> getNativeTransportFactory() {
String nativeTransportFactoryClassName = null;
if (PlatformDependent.isOsx()) {
nativeTransportFactoryClassName = "org.asynchttpclient.netty.channel.KQueueTransportFactory";
} else if (!PlatformDependent.isWindows()) {
nativeTransportFactoryClassName = "org.asynchttpclient.netty.channel.EpollTransportFactory";
}

try {
if (nativeTransportFactoryClassName != null) {
return (TransportFactory<? extends Channel, ? extends EventLoopGroup>) Class.forName(nativeTransportFactoryClassName).newInstance();
}
} catch (Exception ignored) {
// Ignore
}
throw new IllegalArgumentException("No suitable native transport (epoll or kqueue) available");
}

public void configureBootstraps(NettyRequestSender requestSender) {
final AsyncHttpClientHandler httpHandler = new HttpHandler(config, this, requestSender);
wsHandler = new WebSocketHandler(config, this, requestSender);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@

class EpollTransportFactory implements TransportFactory<EpollSocketChannel, EpollEventLoopGroup> {

EpollTransportFactory() {
static boolean isAvailable() {
try {
Class.forName("io.netty.channel.epoll.Epoll");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("The epoll transport is not available");
}
if (!Epoll.isAvailable()) {
throw new IllegalStateException("The epoll transport is not supported");
return false;
}
return Epoll.isAvailable();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.netty.channel;

import io.netty.incubator.channel.uring.IOUring;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringSocketChannel;

import java.util.concurrent.ThreadFactory;

class IoUringIncubatorTransportFactory implements TransportFactory<IOUringSocketChannel, IOUringEventLoopGroup> {

static boolean isAvailable() {
try {
Class.forName("io.netty.incubator.channel.uring.IOUring");
} catch (ClassNotFoundException e) {
return false;
}
return IOUring.isAvailable();
}

@Override
public IOUringSocketChannel newChannel() {
return new IOUringSocketChannel();
}

@Override
public IOUringEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) {
return new IOUringEventLoopGroup(ioThreadsCount, threadFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@

class KQueueTransportFactory implements TransportFactory<KQueueSocketChannel, KQueueEventLoopGroup> {

KQueueTransportFactory() {
static boolean isAvailable() {
try {
Class.forName("io.netty.channel.kqueue.KQueue");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("The kqueue transport is not available");
}
if (!KQueue.isAvailable()) {
throw new IllegalStateException("The kqueue transport is not supported");
return false;
}
return KQueue.isAvailable();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@

public class AsyncHttpClientDefaultsTest {

@Test
public void testDefaultUseOnlyEpollNativeTransport() {
assertFalse(AsyncHttpClientConfigDefaults.defaultUseOnlyEpollNativeTransport());
testBooleanSystemProperty("useOnlyEpollNativeTransport", "defaultUseOnlyEpollNativeTransport", "false");
}

@Test
public void testDefaultMaxTotalConnections() {
assertEquals(AsyncHttpClientConfigDefaults.defaultMaxConnections(), -1);
Expand Down
Loading