最近看了下在退出服务时在一些极端case下出现core的问题,顺便也重新梳理下fbthrift的启动和停止流程。在我们的一个服务进程中,会启动多个ThriftServer,他们的IO线程池和工作线程池是共享的,除了RPC Server对外暴露接口之外,当接收到一个RPC请求时,可能会调用服务进程中的其他组件(比如kv引擎等),这也就要求我们在退出时,需要考虑线程池共享,以及各个组件之间停止和析构的顺序。否则就会出现比如在退出服务时组件先析构,而RPC还在工作又调用了相关组件,然后core的问题。

这篇文章不会详细介绍fbthrift的相关核心概念,可以参考这个系列文章

start-up

启动涉及的接口主要就是这两个接口:

  • serve
  • setup

serve

serve主要就是调用setup,然后启动一个EventBase将线程挂起等待退出。另外当EventBase退出的时候,serve会调用cleanUp清理。这里先给出一个principle

servestop配对调用,setupcleanUp配对调用

void ThriftServer::serve() {
  setup();
  if (serverChannel_ != nullptr) {
    // A duplex server (the one running on a client) doesn't uses its own EB
    // since it reuses the client's EB
    return;
  }
  SCOPE_EXIT { this->cleanUp(); };

  auto sslContextConfigCallbackHandle = sslContextObserver_
      ? getSSLCallbackHandle()
      : folly::observer::CallbackHandle{};

  tracker_.emplace(instrumentation::kThriftServerTrackerKey, *this);
  eventBaseManager_->getEventBase()->loopForever();
}

setup

会对一些关键代码加一些注释,不重要的大块代码被省略。

这里面主要就是启动两个线程池acceptPoolioThreadPoolacceptPool用来建立TCP链接,而ioThreadPool用来响应客户端的事件,比如调用read/write来读写tcp buffer,解析是需要调用哪个RPC接口,不做任何具体业务处理。ioThreadPool会将解析出来的RPC Request交给工作线程处理。

void ThriftServer::setup() {
  ensureDecoratedProcessorFactoryInitialized();

  auto nWorkers = getNumIOWorkerThreads();
  DCHECK_GT(nWorkers, 0u);

  addRoutingHandler(
      std::make_unique<apache::thrift::RocketRoutingHandler>(*this));

  // Initialize event base for this thread
  auto serveEventBase = eventBaseManager_->getEventBase();
  serveEventBase_ = serveEventBase;
  if (idleServerTimeout_.count() > 0) {
    idleServer_.emplace(*this, serveEventBase->timer(), idleServerTimeout_);
  }
  // Print some libevent stats
  VLOG(1) << "libevent " << folly::EventBase::getLibeventVersion() << " method "
          << serveEventBase->getLibeventMethod();

  try {
    ...

    // 设置ThriftServer的Observer 主要用来监控每个请求的排队时间 处理时间等
    if (!getObserver() && server::observerFactory_) {
      setObserver(server::observerFactory_->getObserver());
    }

    // We always need a threadmanager for cpp2.
    // 如果没有通过setThreadManager指定工作线程 会启动一个工作线程池
    setupThreadManager();
    ...

    // serverChannel_是对启动和停止使用startDuplex和stopDuplex时使用的
    // 正常的ThriftServerd都是走if路径
    if (!serverChannel_) {
      // 设置tcp链接的相关参数
      ServerBootstrap::socketConfig.acceptBacklog = getListenBacklog();
      ServerBootstrap::socketConfig.maxNumPendingConnectionsPerWorker =
          getMaxNumPendingConnectionsPerWorker();
      if (reusePort_.value_or(false)) {
        ServerBootstrap::setReusePort(true);
      }
      if (enableTFO_) {
        ServerBootstrap::socketConfig.enableTCPFastOpen = *enableTFO_;
        ServerBootstrap::socketConfig.fastOpenQueueSize = fastOpenQueueSize_;
      }

      ioThreadPool_->addObserver(
          folly::IOThreadPoolDeadlockDetectorObserver::create(
              ioThreadPool_->getName()));

      // Resize the IO pool
      ioThreadPool_->setNumThreads(nWorkers);
      // 启动accept线程池
      if (!acceptPool_) {
        acceptPool_ = std::make_shared<folly::IOThreadPoolExecutor>(
            nAcceptors_,
            std::make_shared<folly::NamedThreadFactory>("Acceptor Thread"));
      }

      auto acceptorFactory = acceptorFactory_
          ? acceptorFactory_
          : std::make_shared<DefaultThriftAcceptorFactory>(this);
      if (auto factory = dynamic_cast<wangle::AcceptorFactorySharedSSLContext*>(
              acceptorFactory.get())) {
        sharedSSLContextManager_ = factory->initSharedSSLContextManager();
      }
      ServerBootstrap::childHandler(std::move(acceptorFactory));

      // 调用wangle的接口 启动acceptPool和ioThreadPool
      {
        std::lock_guard<std::mutex> lock(ioGroupMutex_);
        ServerBootstrap::group(acceptPool_, ioThreadPool_);
      }
      if (socket_) {
        ServerBootstrap::bind(std::move(socket_));
      } else if (!getAddress().isInitialized()) {
        ServerBootstrap::bind(port_);
      } else {
        for (auto& address : addresses_) {
          ServerBootstrap::bind(address);
        }
      }
      // Update address_ with the address that we are actually bound to.
      // (This is needed if we were supplied a pre-bound socket, or if
      // address_'s port was set to 0, so an ephemeral port was chosen by
      // the kernel.)
      ServerBootstrap::getSockets()[0]->getAddress(&addresses_.at(0));

      // we enable zerocopy for the server socket if the
      // zeroCopyEnableFunc_ is valid
      ...
    } else {
      startDuplex();
    }

#if FOLLY_HAS_COROUTINES
    asyncScope_ = std::make_unique<folly::coro::CancellableAsyncScope>();
#endif
    for (auto handler : collectServiceHandlers()) {
      handler->setServer(this);
    }

    DCHECK(
        internalStatus_.load(std::memory_order_relaxed) ==
        ServerStatus::NOT_RUNNING);
    // The server is not yet ready for the user's service methods but fine to
    // handle internal methods. See ServerConfigs::getInternalMethods().
    internalStatus_.store(
        ServerStatus::PRE_STARTING, std::memory_order_release);

    // Notify handler of the preStart event
    for (const auto& eventHandler : getEventHandlersUnsafe()) {
      eventHandler->preStart(&addresses_.at(0));
    }

    internalStatus_.store(ServerStatus::STARTING, std::memory_order_release);

    // Called after setup
    callOnStartServing();

    // After the onStartServing hooks have finished, we are ready to handle
    // requests, at least from the server's perspective.
    internalStatus_.store(ServerStatus::RUNNING, std::memory_order_release);

#if FOLLY_HAS_COROUTINES
    // Set up polling for PolledServiceHealth handlers if necessary
    ...
#endif

    // Notify handler of the preServe event
    for (const auto& eventHandler : getEventHandlersUnsafe()) {
      eventHandler->preServe(&addresses_.at(0));
    }

    // Do not allow setters to be called past this point until the IO worker
    // threads have been joined in stopWorkers().
    configMutable_ = false;
  } catch (std::exception& ex) {
    // This block allows us to investigate the exception using gdb
    LOG(ERROR) << "Got an exception while setting up the server: " << ex.what();
    handleSetupFailure();
    throw;
  } catch (...) {
    handleSetupFailure();
    throw;
  }

  THRIFT_SERVER_EVENT(serve).log(*this);
}

tear-down

退出时候涉及的主要接口还有析构函数

  • stop
  • cleanUp
  • ~ThriftServer()

退出的时候主要就是调用几个函数(stopWorkers, stopCPUWorkers, stopAcceptingAndJoinOutstandingRequests),逻辑比较混乱,和几个参数也有关系,后面也会画个图:

  • stopWorkersOnStopListening_ (default is true)
  • joinRequestsWhenServerStops_ (default is true)

我们先看在退出时调用的这几个函数的作用,然后再来梳理这几个函数在哪调用的。

  1. stopWorkers

作用就是停掉acceptPoolioThreadPool_ServerBootstrap实现在wangle里面

void ThriftServer::stopWorkers() {
  if (serverChannel_) {
    return;
  }
  DCHECK(!duplexWorker_);
  // 不再listen socket
  ServerBootstrap::stop();
  // 调用在启动时设置的acceptPool_和ioThreadPool_的join
  ServerBootstrap::join();
  // configMutable_为true之后就可以重新setup
  configMutable_ = true;
}

这两个线程池都是IOThreadPoolExecutor,最终都是调用ThreadPoolExecutor::join。这里需要注意的是只会把当前threadList_里的线程取出来,然后调用stopThreads,并把它们从threadList_里面移除。所以并不能防止在调用join之后,再启动新的线程。

void ThreadPoolExecutor::join() {
  joinKeepAliveOnce();
  size_t n = 0;
  {
    SharedMutex::WriteHolder w{&threadListLock_};
    maxThreads_.store(0, std::memory_order_release);
    activeThreads_.store(0, std::memory_order_release);
    n = threadList_.get().size();
    removeThreads(n, true);
    n += threadsToJoin_.load(std::memory_order_relaxed);
    threadsToJoin_.store(0, std::memory_order_relaxed);
  }
  joinStoppedThreads(n);
  CHECK_EQ(0, threadList_.get().size());
  CHECK_EQ(0, stoppedThreads_.size());
}

void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) {
  isJoin_ = isJoin;
  stopThreads(n);
}

void IOThreadPoolExecutor::stopThreads(size_t n) {
  std::vector<ThreadPtr> stoppedThreads;
  stoppedThreads.reserve(n);
  for (size_t i = 0; i < n; i++) {
    const auto ioThread =
        std::static_pointer_cast<IOThread>(threadList_.get()[i]);
    for (auto& o : observers_) {
      o->threadStopped(ioThread.get());
    }
    ioThread->shouldRun = false;
    stoppedThreads.push_back(ioThread);
    std::lock_guard<std::mutex> guard(ioThread->eventBaseShutdownMutex_);
    if (ioThread->eventBase) {
      ioThread->eventBase->terminateLoopSoon();
    }
  }
  for (const auto& thread : stoppedThreads) {
    stoppedThreads_.add(thread);
    threadList_.remove(thread);
  }
}
  1. stopCPUWorkers

主要作用就是对所有workerThread调用join,这里说的是ThreadManager::Impl::Worker所在线程。

void ThriftServer::stopCPUWorkers() {
  // Wait for any tasks currently running on the task queue workers to
  // finish, then stop the task queue workers. Have to do this now, so
  // there aren't tasks completing and trying to write to i/o thread
  // workers after we've stopped the i/o workers.
  threadManager_->join();
#if FOLLY_HAS_COROUTINES
  // Wait for tasks running on AsyncScope to join
  folly::coro::blockingWait(asyncScope_->joinAsync());
  cachedServiceHealth_.store(ServiceHealth{}, std::memory_order_relaxed);
#endif

  // Notify handler of the postStop event
  for (const auto& eventHandler : getEventHandlersUnsafe()) {
    eventHandler->postStop();
  }

#if FOLLY_HAS_COROUTINES
  asyncScope_.reset();
#endif
}

这里最关键的就是调用了threadManager_->join();,我们代码里使用的是PriorityThreadManager,大概调用链路如下:

PriorityThreadManager::PriorityImpl::join() -> ThreadManager::Impl::join() -> ThreadManager::Impl::stopImpl

作用就是确保每个ThreadManager::Impl::Worker所在的工作线程都退出,详细分析在这里

  1. stopAcceptingAndJoinOutstandingRequests

调用AsyncServerSocket::stopAccepting,关掉所有socket并join所有Cpp2Worker线程。

void ThriftServer::stopAcceptingAndJoinOutstandingRequests() {
  {
    auto expected = ServerStatus::PRE_STOPPING;
    if (!internalStatus_.compare_exchange_strong(
            expected,
            ServerStatus::STOPPING,
            std::memory_order_release,
            std::memory_order_relaxed)) {
      // stopListening() was called earlier
      DCHECK(
          expected == ServerStatus::STOPPING ||
          expected == ServerStatus::DRAINING_UNTIL_STOPPED ||
          expected == ServerStatus::NOT_RUNNING);
      return;
    }
  }

  callOnStopServing();

  internalStatus_.store(
      ServerStatus::DRAINING_UNTIL_STOPPED, std::memory_order_release);

  forEachWorker([&](wangle::Acceptor* acceptor) {
    if (auto worker = dynamic_cast<Cpp2Worker*>(acceptor)) {
      worker->requestStop();
    }
  });
  // tlsCredWatcher_ uses a background thread that needs to be joined prior
  // to any further writes to ThriftServer members.
  tlsCredWatcher_.withWLock([](auto& credWatcher) { credWatcher.reset(); });
  sharedSSLContextManager_ = nullptr;

  {
    auto sockets = getSockets();
    folly::Baton<> done;
    SCOPE_EXIT { done.wait(); };
    std::shared_ptr<folly::Baton<>> doneGuard(
        &done, [](folly::Baton<>* done) { done->post(); });

    for (auto& socket : sockets) {
      // We should have already paused accepting new connections. This just
      // closes the sockets once and for all.
      auto eb = socket->getEventBase();
      eb->runInEventBaseThread([socket = std::move(socket), doneGuard] {
        // This will also cause the workers to stop
        socket->stopAccepting();
      });
    }
  }

  auto joinDeadline =
      std::chrono::steady_clock::now() + getWorkersJoinTimeout();
  bool dumpSnapshotFlag = THRIFT_FLAG(dump_snapshot_on_long_shutdown);

  forEachWorker([&](wangle::Acceptor* acceptor) {
    if (auto worker = dynamic_cast<Cpp2Worker*>(acceptor)) {
      if (!worker->waitForStop(joinDeadline)) {
        // ...
        // 指定时间之内有Cpp2Worker没有退出 会dump一些信息然后退出
      }
    }
  });

  // Clear the decorated processor factory so that it's re-created if the server
  // is restarted.
  // Note that duplex servers drain connections in the destructor so we need to
  // keep the AsyncProcessorFactory alive until then. Duplex servers also don't
  // support restarting the server so extending its lifetime should not cause
  // issues.
  if (!isDuplex()) {
    decoratedProcessorFactory_.reset();
  }

  internalStatus_.store(ServerStatus::NOT_RUNNING, std::memory_order_release);
}

stop

就是把setup时启动的EventBaset停掉

void ThriftServer::stop() {
  folly::EventBase* eventBase = serveEventBase_;
  if (eventBase) {
    eventBase->terminateLoopSoon();
  }
}

cleanUp

在cleanUp的整个流程中stopWorkersstopAcceptingAndJoinOutstandingRequests这两个函数会根据那几个flag的不同,调用路径会异常复杂,下面会画一张图,具体代码里就不多做阐述了。

void ThriftServer::cleanUp() {
  DCHECK(!serverChannel_);

  // tlsCredWatcher_ uses a background thread that needs to be joined prior
  // to any further writes to ThriftServer members.
  tlsCredWatcher_.withWLock([](auto& credWatcher) { credWatcher.reset(); });

  // It is users duty to make sure that setup() call
  // should have returned before doing this cleanup
  idleServer_.reset();
  serveEventBase_ = nullptr;
  stopListening();

  // Stop the routing handlers.
  for (auto& handler : routingHandlers_) {
    handler->stopListening();
  }

  if (stopWorkersOnStopListening_) {
    // Wait on the i/o worker threads to actually stop
    stopWorkers();
  } else if (joinRequestsWhenServerStops_) {
    stopAcceptingAndJoinOutstandingRequests();
  }

  for (auto handler : getProcessorFactory()->getServiceHandlers()) {
    handler->setServer(nullptr);
  }

  // Now clear all the handlers
  routingHandlers_.clear();
}

~ThriftServer()

ThriftServer::~ThriftServer() {
  tracker_.reset();
  if (duplexWorker_) {
    // usually ServerBootstrap::stop drains the workers, but ServerBootstrap
    // doesn't know about duplexWorker_
    duplexWorker_->drainAllConnections();

    LOG_IF(ERROR, !duplexWorker_.unique())
        << getActiveRequests() << " active Requests while in destructing"
        << " duplex ThriftServer. Consider using startDuplex & stopDuplex";
  }

  if (stopWorkersOnStopListening_) {
    // Everything is already taken care of.
    return;
  }
  // If the flag is false, neither i/o nor CPU workers are stopped at this
  // point. Stop them now.
  if (!joinRequestsWhenServerStops_) {
    stopAcceptingAndJoinOutstandingRequests();
  }
  stopCPUWorkers();
  stopWorkers();
}

stopListening就是调用AsyncServerSocket::pauseAccepting把所有socket绑定的所有EventHandler都unregister掉(不会关掉socket 而AsyncServerSocket::stopAccepting除了unregister所有EventHandler还会把所有socket都关掉),也就不再处理任何事件。

void ThriftServer::stopListening() {
  // We have to make sure stopListening() is not called twice when both
  // stopListening() and cleanUp() are called
  {
    auto expected = ServerStatus::RUNNING;
    if (!internalStatus_.compare_exchange_strong(
            expected,
            ServerStatus::PRE_STOPPING,
            std::memory_order_release,
            std::memory_order_relaxed)) {
      // stopListening() was called earlier
      DCHECK(
          expected == ServerStatus::PRE_STOPPING ||
          expected == ServerStatus::STOPPING ||
          expected == ServerStatus::DRAINING_UNTIL_STOPPED ||
          expected == ServerStatus::NOT_RUNNING);
      return;
    }
  }

#if FOLLY_HAS_COROUTINES
  asyncScope_->requestCancellation();
#endif

  {
    auto sockets = getSockets();
    folly::Baton<> done;
    SCOPE_EXIT { done.wait(); };
    std::shared_ptr<folly::Baton<>> doneGuard(
        &done, [](folly::Baton<>* done) { done->post(); });

    for (auto& socket : sockets) {
      // Stop accepting new connections
      auto eb = socket->getEventBase();
      eb->runInEventBaseThread([socket = std::move(socket), doneGuard] {
        socket->pauseAccepting();
      });
    }
  }

  if (stopWorkersOnStopListening_) {
    stopAcceptingAndJoinOutstandingRequests();
    stopCPUWorkers();
  }
}

调用关系

我们把退出时候的几个接口和要join的线程整理如下图所示,黑线是无条件调用,带颜色的线都是条件成立再调用。

figure

我们分几种情况来说明,由于cleanUp和析构可能都会调用这些函数,所以有两个分支。cleanUp都会在析构之前调用。

  1. stopWorkersOnStopListening_ = true, joinRequestsWhenServerStops_ = true, 这是默认值

调用顺序:

  `cleanUp` -> `stopListening` -> `stopAcceptingAndJoinOutstandingRequests` -> `stopCPUWorkers` -> `stopWorkers`

  `~ThriftServer` -> `stopCPUWorkers` -> `stopWorkers`
  • stopWorkersOnStopListening_ = false, joinRequestsWhenServerStops_ = true

调用顺序:

  `cleanUp` -> `stopListening` -> `stopAcceptingAndJoinOutstandingRequests`

  `~ThriftServer` -> `stopCPUWorkers` -> `stopWorkers`
  • stopWorkersOnStopListening_ = true, joinRequestsWhenServerStops_ = false

调用顺序:

  `cleanUp` -> `stopListening` -> `stopAcceptingAndJoinOutstandingRequests` -> `stopCPUWorkers` -> `stopWorkers`

  `~ThriftServer` -> `stopAcceptingAndJoinOutstandingRequests` -> `stopCPUWorkers` -> `stopWorkers`
  • stopWorkersOnStopListening_ = false, joinRequestsWhenServerStops_ = false

调用顺序:

  `cleanUp` -> `stopListening`

  `~ThriftServer` -> `stopAcceptingAndJoinOutstandingRequests` -> `stopCPUWorkers` -> `stopWorkers`

结合这四种情况,我们可以看到stopWorkers, stopCPUWorkers, stopAcceptingAndJoinOutstandingRequests这三个函数可能会调用多次,由于cleanUp一定在析构之前调用,所以这三个函数的第一次调用一定满足如下条件:

stopAcceptingAndJoinOutstandingRequests -> stopCPUWorkers -> stopWorkers

也就是在ThriftServer退出或者析构的时候,一定会先关闭socket并join所有Cpp2Worker,然后join所有ThreadMangerWorker,最后再关闭acceptPoolioThreadPool

Done~