Buggy folly::RCU
最近忙的焦头烂额,只能匆匆更新一篇最近发现的folly中RCU的bug。不会对RCU做过多详细介绍,只会大致分析folly中RCU的实现,以及这个bug是如何造成的。
使用方式
// void doSomething(IPAddress host);
// std::atomic<ConfigData*> globalConfigData;
//
//
// void reader() {
// while (true) {
// IPAddress curManagementServer;
// {
// // We're about to do some reads we want to protect; if we read a
// // pointer, we need to make sure that if the writer comes along and
// // updates it, the writer's cleanup operation won't happen until we're
// // done accessing the pointed-to data. We get a Guard on that
// // domain; as long as it exists, no function subsequently passed to
// // invokeEventually will execute.
// std::scoped_lock<rcu_domain>(rcu_default_domain());
// ConfigData* configData = globalConfigData.load();
// // We created a guard before we read globalConfigData; we know that the
// // pointer will remain valid until the guard is destroyed.
// curManagementServer = configData->managementServerIP;
// // RCU domain via the scoped mutex is released here; retired objects
// // may be freed.
// }
// doSomethingWith(curManagementServer);
// }
// }
//
// void writer() {
//
// while (true) {
// std::this_thread::sleep_for(std::chrono::seconds(60));
// ConfigData* oldConfigData = globalConfigData.load();
// ConfigData* newConfigData = loadConfigDataFromRemoteServer();
// globalConfigData.store(newConfigData);
// rcu_retire(oldConfigData);
// // alternatively, in a blocking manner:
// // rcu_synchronize();
// // delete oldConfigData;
// }
// }
rcu_domain
// This implementation does implement an rcu_domain, and provides a default
// one for use per the standard implementation.
//
// rcu_domain:
// A "universe" of deferred execution. Each rcu_domain has an
// executor on which deferred functions may execute. Readers enter
// a read region in an rcu_domain by creating an instance of an
// std::scoped_lock<folly::rcu_domain> object on the domain. The scoped
// lock provides RAII semantics for read region protection over the domain.
//
// rcu_domains should in general be completely separated;
// std::scoped_lock<folly::rcu_domain> locks created on one domain do not
// prevent functions deferred on other domains from running. It's intended
// that most callers should only ever use the default, global domain.
//
// You should use a custom rcu_domain if you can't avoid sleeping
// during reader critical sections (because you don't want to block
// all other users of the domain while you sleep), or you want to change
// the default executor type on which retire callbacks are invoked.
// Otherwise, users are strongly encouraged to use the default domain.
API
// Waits for all pre-existing RCU readers to complete.
// RCU readers will normally be marked using the RAII interface
// std::scoped_lock<folly::rcu_domain>, as in:
//
// std::scoped_lock<folly::rcu_domain> rcuReader(rcu_default_domain());
//
// Other locking primitives that provide moveable semantics such as
// std::unique_lock may be used as well.
//
// Note that rcu_synchronize is not obligated to wait for RCU readers that
// start after rcu_synchronize starts. Note also that holding a lock across
// rcu_synchronize that is acquired by any deleter (as in is passed to
// rcu_retire, retire, or call) will result in deadlock. Note that such
// deadlock will normally only occur with user-written deleters, as in the
// default of delele will normally be immune to such deadlocks.
inline void rcu_synchronize(
rcu_domain& domain = rcu_default_domain()) noexcept {
domain.synchronize();
}
// Waits for all in-flight deleters to complete.
//
// An in-flight deleter is one that has already been passed to rcu_retire,
// retire, or call. In other words, rcu_barrier is not obligated to wait
// on any deleters passed to later calls to rcu_retire, retire, or call.
//
// And yes, the current implementation is buggy, and will be fixed.
inline void rcu_barrier(rcu_domain& domain = rcu_default_domain()) noexcept {
domain.synchronize();
}
// Free-function retire. Always allocates.
//
// This will invoke the deleter d(p) asynchronously some time after all
// pre-existing RCU readers have completed. See synchronize_rcu() for more
// information about RCU readers and domains.
template <typename T, typename D = std::default_delete<T>>
void rcu_retire(T* p, D d = {}, rcu_domain& domain = rcu_default_domain()) {
domain.call([p, del = std::move(d)]() { del(p); });
}
rcu_synchronize
和rcu_barrier
是同步等待所有reader退出,其内部会最多调用两次half_sync
rcu_retire
则是注册一个deleter,保存在一个闭包中,当所有reader退出critical sections,就会触发这个deleter闭包
synchronize
每次half_sync
之后epoch会加1,一次synchronize
最多执行两次half_sync
,每次half_sync
会阻塞等待。
当确保[curr, target)
范围之内的reader都退出critical sections之后,将还没有调用过deleter闭包保存到finished
之中,然后开始触发这些deleter。
// Ensure concurrent critical sections have finished.
// Always waits for full synchronization.
// read lock *must not* be held.
void synchronize() noexcept {
auto curr = version_.load(std::memory_order_acquire);
// Target is two epochs away.
auto target = curr + 2;
while (true) {
// Try to assign ourselves to do the sync work.
// If someone else is already assigned, we can wait for
// the work to be finished by waiting on turn_.
auto work = work_.load(std::memory_order_acquire);
auto tmp = work;
if (work < target && work_.compare_exchange_strong(tmp, target)) {
list_head finished;
{
std::lock_guard<std::mutex> g(syncMutex_);
while (version_.load(std::memory_order_acquire) < target) {
half_sync(true, finished);
}
}
// callbacks are called outside of syncMutex_
// 调用过期对象的析构
finished.forEach(
[&](list_node* node) { executor_->add(std::move(node->cb_)); });
return;
} else {
// 其他线程已经进入到了上面的if分支 检查epoch 如果还没有完成等待即可
if (version_.load(std::memory_order_acquire) >= target) {
return;
}
std::atomic<uint32_t> cutoff{100};
// Wait for someone to finish the work.
turn_.tryWaitForTurn(to_narrow(work), cutoff, false);
}
}
}
retire
将retired functions保存在一个list_node
中,然后保存到q_
这个thread local list中。然后根据上次retire
中调用half_sync
的时间间隔,决定这次是否需要调用half_sync
,retire
中调用half_sync
不需要阻塞等待。
// Invokes cbin(this) and then deletes this some time after all pre-existing
// RCU readers have completed. See rcu_synchronize() for more information
// about RCU readers and domains.
template <typename T>
void call(T&& cbin) {
auto node = new list_node;
node->cb_ = [node, cb = std::forward<T>(cbin)]() {
cb();
delete node;
};
retire(node);
}
// Invokes node->cb_(node) some time after all pre-existing RCU readers
// have completed. See rcu_synchronize() for more information about RCU
// readers and domains.
void retire(list_node* node) noexcept {
q_.push(node);
// Note that it's likely we hold a read lock here,
// so we can only half_sync(false). half_sync(true)
// or a synchronize() call might block forever.
uint64_t time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
auto syncTime = syncTime_.load(std::memory_order_relaxed);
if (time > syncTime + syncTimePeriod_ &&
syncTime_.compare_exchange_strong(
syncTime, time, std::memory_order_relaxed)) {
list_head finished;
{
std::lock_guard<std::mutex> g(syncMutex_);
half_sync(false, finished);
}
// callbacks are called outside of syncMutex_
// 调用过期对象的析构
finished.forEach(
[&](list_node* item) { executor_->add(std::move(item->cb_)); });
}
}
half_sync
函数如其名,是一半的synchronize
,调用完这个函数之后,epoch会加1。主要流程如下:
- 将
q_
中的所有闭包收集到queues_[0]
中,注意queues_[0]
是最新的闭包,queues_[1]
中的闭包是之前epoch的闭包 - 如果是阻塞,就等待所有reader退出critical sections(内部基于futex实现等待)
- 如果非阻塞,就检查epoch是否还有对应的reader
- 如果确定所有reader已经退出,就将相应的deleter闭包收集到finished中,然后将
queues_[0]
中的闭包都移动到queues_[1]
中 - 更新epoch
对于一个deleter闭包,它被调用的路径如下:
- 在
rcu_retire
时,会将闭包X
保存在q_
中 - 第一次调用
half_sync
时:- 会将
q_
中的闭包(包括X),移动到queues_[0]
- 然后将
queues_[1]
的闭包移动到finished中,在这次half_sync完成之后调用 - 将
queues_[0]
中的闭包移动到queues_[1]
,包括X
- 会将
- 第二次调用
half_sync
时,重复上面的流程,此时queues_[1]
中的X
就会被移动到finished中
,随后就会调用
// Move queues through one epoch (half of a full synchronize()).
// Will block waiting for readers to exit if blocking is true.
// blocking must *not* be true if the current thread is locked,
// or will deadlock.
//
// returns a list of callbacks ready to run in finished.
void half_sync(bool blocking, list_head& finished) {
auto curr = version_.load(std::memory_order_acquire);
auto next = curr + 1;
// Push all work to a queue for moving through two epochs. One
// version is not enough because of late readers of the version_
// counter in lock_shared.
//
// Note that for a similar reason we can't swap out the q here,
// and instead drain it, so concurrent calls to call() are safe,
// and will wait for the next epoch.
q_.collect(queues_[0]);
if (blocking) {
counters_.waitForZero(next & 1);
} else {
if (!counters_.epochIsClear(next & 1)) {
return;
}
}
// Run callbacks that have been through two epochs, and swap queues
// for those only through a single epoch.
finished.splice(queues_[1]);
queues_[1].splice(queues_[0]);
version_.store(next, std::memory_order_release);
// Notify synchronous waiters in synchronize().
turn_.completeTurn(to_narrow(curr));
}
note
retired functions,即rcu_retire
时传入的deleter函数有以下限制:
- 不能抛异常
- 如果使用的是default domain,不能在deleter函数中使用任何互斥锁(否则会导致死锁)
- 调用同步接口
rcu_synchronize
时,不能持有任何在retired functions中也会使用的互斥锁(否则也会死锁)
几个关键API的性能:
-
deferral latency (即什么时候会释放过期对象)
// Deferral latency is as low as is practical: overhead involves running // several global memory barriers on the machine to ensure all readers are gone.
-
rcu_synchronize
// rcu_synchronize() call latency is on the order of 10ms. Multiple // separate threads can share a synchronized period and should scale.
-
rcu_retire
// rcu_retire() is a queue push, and on the order of 150 ns, however, // the current implementation may synchronize if the retire queue is full, // resulting in tail latencies of ~10ms.
-
std::scoped_lock<rcu_domain>
(即reader critical secions的构造和析构时间)// std::scoped_lock<rcu_domain> creation/destruction is ~5ns. By comparison, // folly::SharedMutex::lock_shared + unlock_shared pair is ~26ns
数据结构
class rcu_domain {
using list_head = typename detail::ThreadCachedLists::ListHead;
using list_node = typename detail::ThreadCachedLists::Node;
private:
// 用来统计有多少正在运行的reader
detail::ThreadCachedReaders counters_;
// Global epoch.
std::atomic<uint64_t> version_{0};
// Future epochs being driven by threads in synchronize
// work_保存上次执行rcu_synchronize的epoch
std::atomic<uint64_t> work_{0};
// Matches version_, for waking up threads in synchronize that are
// sharing an epoch.
detail::TurnSequencer<std::atomic> turn_;
// Only one thread can drive half_sync.
std::mutex syncMutex_;
// Currently half_sync takes ~16ms due to heavy barriers.
// Ensure that if used in a single thread, max GC time
// is limited to 1% of total CPU time.
static constexpr uint64_t syncTimePeriod_{1600 * 2 /* full sync is 2x */};
std::atomic<uint64_t> syncTime_{0};
// call()s waiting to move through two epochs.
// 每个线程的retired functions会保存到这个thread local list中
detail::ThreadCachedLists q_;
// Executor callbacks will eventually be run on.
Executor* executor_{nullptr};
// Queues for callbacks waiting to go through two epochs.
list_head queues_[2]{};
};
ThreadCachedReaders
// A data structure that keeps a per-thread cache of a bitfield that contains
// the current active epoch for readers in the thread, and the number of active
// readers:
//
// _______________________________________
// | Current Epoch | # Readers |
// epoch_readers: | 63 62 ... 34 33 32 | 31 30 ... 2 1 0|
// o--------------------|----------------o
//
// There are several important implications with this data structure:
//
// 1. Read regions must be entered and exited on the same thread.
// 2. Read regions are fully nested. That is, two read regions in a single
// thread may not overlap across two epochs.
buggy?
通常来说,rcu_barrier
能保证之前调用的rcu_retire
的deleter都被调用,但在并发场景下,rcu_barrier
并不能保障这一点。下面的t2
就会在assert(done)
这里断言失败。
TEST(RCUTest, Doodle) {
folly::rcu_domain domain;
std::atomic<int*> val;
val.store(new int(0));
std::mutex mu;
bool stopped = false;
std::thread t1([&domain, &val, &stopped, &mu] {
int i = 1;
while (true) {
{
std::lock_guard lock(mu);
if (stopped) {
break;
}
}
auto oldVal = val.load();
auto newVal = new int(i++);
val.store(newVal);
folly::rcu_retire(
oldVal, [](int* ptr) { delete ptr; }, domain);
folly::rcu_barrier(domain);
}
});
std::thread t2([&domain, &val, &stopped, &mu] {
{
std::lock_guard lock(mu);
stopped = true;
}
auto oldVal = val.load();
auto newVal = new int(0);
val.store(newVal);
bool done = false;
folly::rcu_retire(
oldVal,
[&done](int* ptr) {
delete ptr;
done = true;
},
domain);
folly::rcu_barrier(domain);
assert(done);
});
t1.join();
t2.join();
}
在t2
中我们先执行了一次rcu_retire
,再执行一次rcu_barrier
。按照rcu_barrier
功能,任何在rcu_barrier
之前retire的对象,都应该被调用其deleter,即done
应该为true,但实际上断言assert(done)
会失败。翻看对应注释,里面提到当前实现有bug。
// Waits for all in-flight deleters to complete.
//
// An in-flight deleter is one that has already been passed to rcu_retire,
// retire, or call. In other words, rcu_barrier is not obligated to wait
// on any deleters passed to later calls to rcu_retire, retire, or call.
//
// And yes, the current implementation is buggy, and will be fixed.
inline void rcu_barrier(rcu_domain& domain = rcu_default_domain()) noexcept {
domain.synchronize();
}
我们通过rr可以复现这个问题,看看这个bug是怎么出现的:
global epoch(即version_) | t1 | t2 |
---|---|---|
8860 | rcu_barrier | |
→ first half_sync | ||
→ collect | ||
rcu_retire | ||
rcu_barrier | ||
8861 | → set epoch as 8861 | |
→ first half_sync | ||
8862 | → set epoch as 8862 | |
→ won’t call second half_sync | ||
→ won’t call second half_sync |
当global epoch(即代码中的version_
)为8860时,t1
调用了rcu_barrier
,它应该通过调用最多两次half_sync
,阻塞等待所有epoch小于8860 + 2 = 8862
的reader退出作用域。在第一次执行half_sync
的期间,t2
执行了一次rcu_retire
,我们将这个对象记为X
,X
的deleter闭包会保存在q_
中。由于t1
第一次的half_sync
中的collect
执行在前,t1
不会感知到q_
中的X
的存在。随后t2
也可开始调用了rcu_barrier
,它同样也会最多调用两次half_sync
,直到epoch大于等于8862(在t2
的视角,X
的deleter一定会被调用)。
此时t1
第一次half_sync
完成,将epoch更新为8861。之后t2也调用了第一次half_sync
,将epoch更新为8862,X的deleter闭包此时会保存在queues_[1]
中。
之后,无论是t1
和t2
,由于epoch已经为8862,距离开始调用rcu_barrier
时的epoch 8860已经过去了两个epoch,因此都不会调用第二次half_sync
。因此X
的deleter闭包仍然停留在queues_[1]
中,没有被调用。所以我们测试代码中的断言才会失败。
因此,代码注释中所说rcu_barrier
的bug就是:在并发场景下并不能保证在调用rcu_barrier
之前的rcu_retire
对象的deleter一定触发。因此在调用rcu_barrier
时,如果对deleter的调用有期望的时候(比如测试代码中期望done
设置为true
),只调用一次rcu_barrier
是不够的。
要解决这个问题也很简单,如果对rcu_retire
的对象deleter有调用期望,我们执行两次rcu_barrier
即可,这样一定能保证X
的deleter闭包一定会被调用(只要再调用一次half_sync
,在queues_[1]
中的X
闭包一定会被移出queues_[1]
并调用)。