-
Notifications
You must be signed in to change notification settings - Fork 13.6k
[Offload] Properly guard modifications to the RPC device array #126790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Summary: If the user deallocates an RPC device this can sometimes fail if the RPC server is still running. This will happen if the modification happens while the server is still checking it. This patch adds a mutex to guard modifications to it.
@llvm/pr-subscribers-offload Author: Joseph Huber (jhuber6) ChangesSummary: Full diff: https://github.com/llvm/llvm-project/pull/126790.diff 2 Files Affected:
diff --git a/offload/plugins-nextgen/common/include/RPC.h b/offload/plugins-nextgen/common/include/RPC.h
index 7b031083647aa..ab110dd88315a 100644
--- a/offload/plugins-nextgen/common/include/RPC.h
+++ b/offload/plugins-nextgen/common/include/RPC.h
@@ -72,6 +72,9 @@ struct RPCServerTy {
/// Array of associated devices. These must be alive as long as the server is.
std::unique_ptr<plugin::GenericDeviceTy *[]> Devices;
+ /// Mutex that guards accesses to the buffers and device array.
+ std::mutex BufferMutex{};
+
/// A helper class for running the user thread that handles the RPC interface.
/// Because we only need to check the RPC server while any kernels are
/// working, we track submission / completion events to allow the thread to
@@ -90,6 +93,9 @@ struct RPCServerTy {
std::condition_variable CV;
std::mutex Mutex;
+ /// A reference to the main server's mutex.
+ std::mutex &BufferMutex;
+
/// A reference to all the RPC interfaces that the server is handling.
llvm::ArrayRef<void *> Buffers;
@@ -98,9 +104,9 @@ struct RPCServerTy {
/// Initialize the worker thread to run in the background.
ServerThread(void *Buffers[], plugin::GenericDeviceTy *Devices[],
- size_t Length)
- : Running(false), NumUsers(0), CV(), Mutex(), Buffers(Buffers, Length),
- Devices(Devices, Length) {}
+ size_t Length, std::mutex &BufferMutex)
+ : Running(false), NumUsers(0), CV(), Mutex(), BufferMutex(BufferMutex),
+ Buffers(Buffers, Length), Devices(Devices, Length) {}
~ServerThread() { assert(!Running && "Thread not shut down explicitly\n"); }
diff --git a/offload/plugins-nextgen/common/src/RPC.cpp b/offload/plugins-nextgen/common/src/RPC.cpp
index 4289f920c0e1e..70f572923d4b1 100644
--- a/offload/plugins-nextgen/common/src/RPC.cpp
+++ b/offload/plugins-nextgen/common/src/RPC.cpp
@@ -128,6 +128,7 @@ void RPCServerTy::ServerThread::run() {
Lock.unlock();
while (NumUsers.load(std::memory_order_relaxed) > 0 &&
Running.load(std::memory_order_relaxed)) {
+ std::lock_guard<decltype(Mutex)> Lock(BufferMutex);
for (const auto &[Buffer, Device] : llvm::zip_equal(Buffers, Devices)) {
if (!Buffer || !Device)
continue;
@@ -146,7 +147,7 @@ RPCServerTy::RPCServerTy(plugin::GenericPluginTy &Plugin)
Devices(std::make_unique<plugin::GenericDeviceTy *[]>(
Plugin.getNumDevices())),
Thread(new ServerThread(Buffers.get(), Devices.get(),
- Plugin.getNumDevices())) {}
+ Plugin.getNumDevices(), BufferMutex)) {}
llvm::Error RPCServerTy::startThread() {
Thread->startThread();
@@ -187,6 +188,7 @@ Error RPCServerTy::initDevice(plugin::GenericDeviceTy &Device,
if (auto Err = Device.dataSubmit(ClientGlobal.getPtr(), &client,
sizeof(rpc::Client), nullptr))
return Err;
+ std::lock_guard<decltype(BufferMutex)> Lock(BufferMutex);
Buffers[Device.getDeviceId()] = RPCBuffer;
Devices[Device.getDeviceId()] = &Device;
@@ -194,6 +196,7 @@ Error RPCServerTy::initDevice(plugin::GenericDeviceTy &Device,
}
Error RPCServerTy::deinitDevice(plugin::GenericDeviceTy &Device) {
+ std::lock_guard<decltype(BufferMutex)> Lock(BufferMutex);
Device.free(Buffers[Device.getDeviceId()], TARGET_ALLOC_HOST);
Buffers[Device.getDeviceId()] = nullptr;
Devices[Device.getDeviceId()] = nullptr;
|
@@ -128,6 +128,7 @@ void RPCServerTy::ServerThread::run() { | |||
Lock.unlock(); | |||
while (NumUsers.load(std::memory_order_relaxed) > 0 && | |||
Running.load(std::memory_order_relaxed)) { | |||
std::lock_guard<decltype(Mutex)> Lock(BufferMutex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How often do we hit this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every time we do a full scan of all the ports, so very often but I don't think there's a way to avoid that overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So that is when we have something on the device actually using the RPC mechanism then we will hit this overhead, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it might make it slightly slower if you're hitting RPC hard. As in, every thread constantly submitting requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, but if this only occurs when you actually hit RPC I'm much less concerned and think this is fine for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This should address some test flakiness we started observing in the buildbots.
/cherry-pick baf7a3c |
/pull-request #126795 |
…126790) Summary: If the user deallocates an RPC device this can sometimes fail if the RPC server is still running. This will happen if the modification happens while the server is still checking it. This patch adds a mutex to guard modifications to it.
…126790) Summary: If the user deallocates an RPC device this can sometimes fail if the RPC server is still running. This will happen if the modification happens while the server is still checking it. This patch adds a mutex to guard modifications to it.
…126790) Summary: If the user deallocates an RPC device this can sometimes fail if the RPC server is still running. This will happen if the modification happens while the server is still checking it. This patch adds a mutex to guard modifications to it.
…126790) Summary: If the user deallocates an RPC device this can sometimes fail if the RPC server is still running. This will happen if the modification happens while the server is still checking it. This patch adds a mutex to guard modifications to it.
Summary:
If the user deallocates an RPC device this can sometimes fail if the RPC
server is still running. This will happen if the modification happens
while the server is still checking it. This patch adds a mutex to guard
modifications to it.