最近忙的焦头烂额,只能匆匆更新一篇最近发现的folly中RCU的bug。不会对RCU做过多详细介绍,只会大致分析folly中RCU的实现,以及这个bug是如何造成的。

使用方式

// void doSomething(IPAddress host);
// std::atomic<ConfigData*> globalConfigData;
//
//
// void reader() {
//   while (true) {
//     IPAddress curManagementServer;
//     {
//       // We're about to do some reads we want to protect; if we read a
//       // pointer, we need to make sure that if the writer comes along and
//       // updates it, the writer's cleanup operation won't happen until we're
//       // done accessing the pointed-to data. We get a Guard on that
//       // domain; as long as it exists, no function subsequently passed to
//       // invokeEventually will execute.
//       std::scoped_lock<rcu_domain>(rcu_default_domain());
//       ConfigData* configData = globalConfigData.load();
//       // We created a guard before we read globalConfigData; we know that the
//       // pointer will remain valid until the guard is destroyed.
//       curManagementServer = configData->managementServerIP;
//       // RCU domain via the scoped mutex is released here; retired objects
//       // may be freed.
//     }
//     doSomethingWith(curManagementServer);
//   }
// }
//
// void writer() {
//
//   while (true) {
//     std::this_thread::sleep_for(std::chrono::seconds(60));
//     ConfigData* oldConfigData = globalConfigData.load();
//     ConfigData* newConfigData = loadConfigDataFromRemoteServer();
//     globalConfigData.store(newConfigData);
//     rcu_retire(oldConfigData);
//     // alternatively, in a blocking manner:
//     //   rcu_synchronize();
//     //   delete oldConfigData;
//   }
// }

rcu_domain

// This implementation does implement an rcu_domain, and provides a default
// one for use per the standard implementation.
//
// rcu_domain:
//   A "universe" of deferred execution. Each rcu_domain has an
//   executor on which deferred functions may execute. Readers enter
//   a read region in an rcu_domain by creating an instance of an
//   std::scoped_lock<folly::rcu_domain> object on the domain. The scoped
//   lock provides RAII semantics for read region protection over the domain.
//
//   rcu_domains should in general be completely separated;
//   std::scoped_lock<folly::rcu_domain> locks created on one domain do not
//   prevent functions deferred on other domains from running. It's intended
//   that most callers should only ever use the default, global domain.
//
//   You should use a custom rcu_domain if you can't avoid sleeping
//   during reader critical sections (because you don't want to block
//   all other users of the domain while you sleep), or you want to change
//   the default executor type on which retire callbacks are invoked.
//   Otherwise, users are strongly encouraged to use the default domain.

API

// Waits for all pre-existing RCU readers to complete.
// RCU readers will normally be marked using the RAII interface
// std::scoped_lock<folly::rcu_domain>, as in:
//
// std::scoped_lock<folly::rcu_domain> rcuReader(rcu_default_domain());
//
// Other locking primitives that provide moveable semantics such as
// std::unique_lock may be used as well.
//
// Note that rcu_synchronize is not obligated to wait for RCU readers that
// start after rcu_synchronize starts.  Note also that holding a lock across
// rcu_synchronize that is acquired by any deleter (as in is passed to
// rcu_retire, retire, or call) will result in deadlock.  Note that such
// deadlock will normally only occur with user-written deleters, as in the
// default of delele will normally be immune to such deadlocks.
inline void rcu_synchronize(
    rcu_domain& domain = rcu_default_domain()) noexcept {
  domain.synchronize();
}

// Waits for all in-flight deleters to complete.
//
// An in-flight deleter is one that has already been passed to rcu_retire,
// retire, or call.  In other words, rcu_barrier is not obligated to wait
// on any deleters passed to later calls to rcu_retire, retire, or call.
//
// And yes, the current implementation is buggy, and will be fixed.
inline void rcu_barrier(rcu_domain& domain = rcu_default_domain()) noexcept {
  domain.synchronize();
}

// Free-function retire.  Always allocates.
//
// This will invoke the deleter d(p) asynchronously some time after all
// pre-existing RCU readers have completed.  See synchronize_rcu() for more
// information about RCU readers and domains.
template <typename T, typename D = std::default_delete<T>>
void rcu_retire(T* p, D d = {}, rcu_domain& domain = rcu_default_domain()) {
  domain.call([p, del = std::move(d)]() { del(p); });
}
  • rcu_synchronizercu_barrier是同步等待所有reader退出,其内部会最多调用两次half_sync
  • rcu_retire则是注册一个deleter,保存在一个闭包中,当所有reader退出critical sections,就会触发这个deleter闭包

synchronize

每次half_sync之后epoch会加1,一次synchronize最多执行两次half_sync,每次half_sync会阻塞等待。

当确保[curr, target)范围之内的reader都退出critical sections之后,将还没有调用过deleter闭包保存到finished之中,然后开始触发这些deleter。


  // Ensure concurrent critical sections have finished.
  // Always waits for full synchronization.
  // read lock *must not* be held.
  void synchronize() noexcept {
    auto curr = version_.load(std::memory_order_acquire);
    // Target is two epochs away.
    auto target = curr + 2;
    while (true) {
      // Try to assign ourselves to do the sync work.
      // If someone else is already assigned, we can wait for
      // the work to be finished by waiting on turn_.
      auto work = work_.load(std::memory_order_acquire);
      auto tmp = work;
      if (work < target && work_.compare_exchange_strong(tmp, target)) {
        list_head finished;
        {
          std::lock_guard<std::mutex> g(syncMutex_);
          while (version_.load(std::memory_order_acquire) < target) {
            half_sync(true, finished);
          }
        }
        // callbacks are called outside of syncMutex_
        // 调用过期对象的析构
        finished.forEach(
            [&](list_node* node) { executor_->add(std::move(node->cb_)); });
        return;
      } else {
        // 其他线程已经进入到了上面的if分支 检查epoch 如果还没有完成等待即可
        if (version_.load(std::memory_order_acquire) >= target) {
          return;
        }
        std::atomic<uint32_t> cutoff{100};
        // Wait for someone to finish the work.
        turn_.tryWaitForTurn(to_narrow(work), cutoff, false);
      }
    }
  }

retire

将retired functions保存在一个list_node中,然后保存到q_这个thread local list中。然后根据上次retire中调用half_sync的时间间隔,决定这次是否需要调用half_syncretire中调用half_sync不需要阻塞等待。

  // Invokes cbin(this) and then deletes this some time after all pre-existing
  // RCU readers have completed.  See rcu_synchronize() for more information
  // about RCU readers and domains.
  template <typename T>
  void call(T&& cbin) {
    auto node = new list_node;
    node->cb_ = [node, cb = std::forward<T>(cbin)]() {
      cb();
      delete node;
    };
    retire(node);
  }

  // Invokes node->cb_(node) some time after all pre-existing RCU readers
  // have completed.  See rcu_synchronize() for more information about RCU
  // readers and domains.
  void retire(list_node* node) noexcept {
    q_.push(node);

    // Note that it's likely we hold a read lock here,
    // so we can only half_sync(false).  half_sync(true)
    // or a synchronize() call might block forever.
    uint64_t time = std::chrono::duration_cast<std::chrono::milliseconds>(
                        std::chrono::steady_clock::now().time_since_epoch())
                        .count();
    auto syncTime = syncTime_.load(std::memory_order_relaxed);
    if (time > syncTime + syncTimePeriod_ &&
        syncTime_.compare_exchange_strong(
            syncTime, time, std::memory_order_relaxed)) {
      list_head finished;
      {
        std::lock_guard<std::mutex> g(syncMutex_);
        half_sync(false, finished);
      }
      // callbacks are called outside of syncMutex_
      // 调用过期对象的析构
      finished.forEach(
          [&](list_node* item) { executor_->add(std::move(item->cb_)); });
    }
  }

half_sync

函数如其名,是一半的synchronize,调用完这个函数之后,epoch会加1。主要流程如下:

  • q_中的所有闭包收集到queues_[0]中,注意queues_[0]是最新的闭包,queues_[1]中的闭包是之前epoch的闭包
  • 如果是阻塞,就等待所有reader退出critical sections(内部基于futex实现等待)
  • 如果非阻塞,就检查epoch是否还有对应的reader
  • 如果确定所有reader已经退出,就将相应的deleter闭包收集到finished中,然后将queues_[0]中的闭包都移动到queues_[1]
  • 更新epoch

对于一个deleter闭包,它被调用的路径如下:

  1. rcu_retire时,会将闭包X保存在q_
  2. 第一次调用half_sync时:
    1. 会将q_中的闭包(包括X),移动到queues_[0]
    2. 然后将queues_[1]的闭包移动到finished中,在这次half_sync完成之后调用
    3. queues_[0]中的闭包移动到queues_[1],包括X
  3. 第二次调用half_sync时,重复上面的流程,此时queues_[1]中的X就会被移动到finished中,随后就会调用
  // Move queues through one epoch (half of a full synchronize()).
  // Will block waiting for readers to exit if blocking is true.
  // blocking must *not* be true if the current thread is locked,
  // or will deadlock.
  //
  // returns a list of callbacks ready to run in finished.
  void half_sync(bool blocking, list_head& finished) {
    auto curr = version_.load(std::memory_order_acquire);
    auto next = curr + 1;

    // Push all work to a queue for moving through two epochs.  One
    // version is not enough because of late readers of the version_
    // counter in lock_shared.
    //
    // Note that for a similar reason we can't swap out the q here,
    // and instead drain it, so concurrent calls to call() are safe,
    // and will wait for the next epoch.
    q_.collect(queues_[0]);

    if (blocking) {
      counters_.waitForZero(next & 1);
    } else {
      if (!counters_.epochIsClear(next & 1)) {
        return;
      }
    }

    // Run callbacks that have been through two epochs, and swap queues
    // for those only through a single epoch.
    finished.splice(queues_[1]);
    queues_[1].splice(queues_[0]);

    version_.store(next, std::memory_order_release);
    // Notify synchronous waiters in synchronize().
    turn_.completeTurn(to_narrow(curr));
  }

note

retired functions,即rcu_retire时传入的deleter函数有以下限制:

  1. 不能抛异常
  2. 如果使用的是default domain,不能在deleter函数中使用任何互斥锁(否则会导致死锁)
  3. 调用同步接口rcu_synchronize时,不能持有任何在retired functions中也会使用的互斥锁(否则也会死锁)

几个关键API的性能:

  • deferral latency (即什么时候会释放过期对象)

      // Deferral latency is as low as is practical: overhead involves running
      // several global memory barriers on the machine to ensure all readers are gone.
    
  • rcu_synchronize

      // rcu_synchronize() call latency is on the order of 10ms.  Multiple
      // separate threads can share a synchronized period and should scale.
    
  • rcu_retire

      // rcu_retire() is a queue push, and on the order of 150 ns, however,
      // the current implementation may synchronize if the retire queue is full,
      // resulting in tail latencies of ~10ms.
    
  • std::scoped_lock<rcu_domain> (即reader critical secions的构造和析构时间)

      // std::scoped_lock<rcu_domain> creation/destruction is ~5ns.  By comparison,
      // folly::SharedMutex::lock_shared + unlock_shared pair is ~26ns
    

数据结构

class rcu_domain {
  using list_head = typename detail::ThreadCachedLists::ListHead;
  using list_node = typename detail::ThreadCachedLists::Node;

private:
  // 用来统计有多少正在运行的reader
  detail::ThreadCachedReaders counters_;
  // Global epoch.
  std::atomic<uint64_t> version_{0};

  // Future epochs being driven by threads in synchronize
  // work_保存上次执行rcu_synchronize的epoch
  std::atomic<uint64_t> work_{0};

  // Matches version_, for waking up threads in synchronize that are
  // sharing an epoch.
  detail::TurnSequencer<std::atomic> turn_;
  // Only one thread can drive half_sync.
  std::mutex syncMutex_;
  // Currently half_sync takes ~16ms due to heavy barriers.
  // Ensure that if used in a single thread, max GC time
  // is limited to 1% of total CPU time.
  static constexpr uint64_t syncTimePeriod_{1600 * 2 /* full sync is 2x */};
  std::atomic<uint64_t> syncTime_{0};

  // call()s waiting to move through two epochs.
  // 每个线程的retired functions会保存到这个thread local list中
  detail::ThreadCachedLists q_;
  // Executor callbacks will eventually be run on.
  Executor* executor_{nullptr};

  // Queues for callbacks waiting to go through two epochs.
  list_head queues_[2]{};
};

ThreadCachedReaders

// A data structure that keeps a per-thread cache of a bitfield that contains
// the current active epoch for readers in the thread, and the number of active
// readers:
//
//                _______________________________________
//                |   Current Epoch    |    # Readers   |
// epoch_readers: | 63 62 ... 34 33 32 | 31 30 ... 2 1 0|
//                o--------------------|----------------o
//
// There are several important implications with this data structure:
//
// 1. Read regions must be entered and exited on the same thread.
// 2. Read regions are fully nested. That is, two read regions in a single
//    thread may not overlap across two epochs.

buggy?

通常来说,rcu_barrier能保证之前调用的rcu_retire的deleter都被调用,但在并发场景下,rcu_barrier并不能保障这一点。下面的t2就会在assert(done)这里断言失败。

TEST(RCUTest, Doodle) {
    folly::rcu_domain domain;
    std::atomic<int*> val;
    val.store(new int(0));
    std::mutex mu;
    bool stopped = false;

    std::thread t1([&domain, &val, &stopped, &mu] {
        int i = 1;
        while (true) {
            {
                std::lock_guard lock(mu);
                if (stopped) {
                    break;
                }
            }
            auto oldVal = val.load();
            auto newVal = new int(i++);
            val.store(newVal);
            folly::rcu_retire(
                    oldVal, [](int* ptr) { delete ptr; }, domain);
            folly::rcu_barrier(domain);
        }
    });

    std::thread t2([&domain, &val, &stopped, &mu] {
        {
            std::lock_guard lock(mu);
            stopped = true;
        }
        auto oldVal = val.load();
        auto newVal = new int(0);
        val.store(newVal);
        bool done = false;
        folly::rcu_retire(
                oldVal,
                [&done](int* ptr) {
                    delete ptr;
                    done = true;
                },
                domain);
        folly::rcu_barrier(domain);
        assert(done);
    });

    t1.join();
    t2.join();
}

t2中我们先执行了一次rcu_retire,再执行一次rcu_barrier。按照rcu_barrier功能,任何在rcu_barrier之前retire的对象,都应该被调用其deleter,即done应该为true,但实际上断言assert(done)会失败。翻看对应注释,里面提到当前实现有bug。

// Waits for all in-flight deleters to complete.
//
// An in-flight deleter is one that has already been passed to rcu_retire,
// retire, or call.  In other words, rcu_barrier is not obligated to wait
// on any deleters passed to later calls to rcu_retire, retire, or call.
//
// And yes, the current implementation is buggy, and will be fixed.
inline void rcu_barrier(rcu_domain& domain = rcu_default_domain()) noexcept {
  domain.synchronize();
}

我们通过rr可以复现这个问题,看看这个bug是怎么出现的:

global epoch(即version_) t1 t2
8860 rcu_barrier  
  → first half_sync  
  → collect  
    rcu_retire
    rcu_barrier
8861 → set epoch as 8861  
     
    → first half_sync
8862   → set epoch as 8862
  → won’t call second half_sync  
    → won’t call second half_sync

当global epoch(即代码中的version_)为8860时,t1调用了rcu_barrier,它应该通过调用最多两次half_sync,阻塞等待所有epoch小于8860 + 2 = 8862的reader退出作用域。在第一次执行half_sync的期间,t2执行了一次rcu_retire,我们将这个对象记为XX的deleter闭包会保存在q_中。由于t1第一次的half_sync中的collect执行在前,t1不会感知到q_中的X的存在。随后t2也可开始调用了rcu_barrier,它同样也会最多调用两次half_sync,直到epoch大于等于8862(在t2的视角,X的deleter一定会被调用)。

此时t1第一次half_sync完成,将epoch更新为8861。之后t2也调用了第一次half_sync,将epoch更新为8862,X的deleter闭包此时会保存在queues_[1]中。

之后,无论是t1t2,由于epoch已经为8862,距离开始调用rcu_barrier时的epoch 8860已经过去了两个epoch,因此都不会调用第二次half_sync。因此X的deleter闭包仍然停留在queues_[1]中,没有被调用。所以我们测试代码中的断言才会失败。

因此,代码注释中所说rcu_barrier的bug就是:在并发场景下并不能保证在调用rcu_barrier之前的rcu_retire对象的deleter一定触发。因此在调用rcu_barrier时,如果对deleter的调用有期望的时候(比如测试代码中期望done设置为true),只调用一次rcu_barrier是不够的。

要解决这个问题也很简单,如果对rcu_retire的对象deleter有调用期望,我们执行两次rcu_barrier即可,这样一定能保证X的deleter闭包一定会被调用(只要再调用一次half_sync,在queues_[1]中的X闭包一定会被移出queues_[1]并调用)。