Jason Pan

gRPC doc-core 阅读笔记

潘忠显 / 2021-08-10


如果多个线程同时执行操作同一临界区的函数(可能是不同的函数,但是使用同一个锁),会造成其中一个线程被阻塞,这可能带来一些不良影响。

文中提出在满足一些条件的情况下,可以通过技巧来避免阻塞线程而让所有线程一直执行。这些条件包括:

符合这种条件的场景还是蛮多的,比如发送网络请求、写日志等。

给的例子中,有一点需要理解:q.push() 返回 true 是在真正第一次被调用,既元素为被放入的真正第一个元素的时候,而不是队列空了之后q.push() 就会返回 true。后边的理解方式仍然会造成多个线程同时执行函数。

if (q.push(f)) {
  // q.push returns true if it's the first thing
  while (q.pop(&f)) {  // modulo some extra work to avoid races
    f();
  }
}                               

而实际上,出于批量写的动机,这个封装做了更多的工作。

需要 polling engine 的原因:

grpc_endpoint

闭包的内容

/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
  /** Once queued, next indicates the next queued closure; before then, scratch
   *  space */
  union {
    grpc_closure* next;
    grpc_core::ManualConstructor<
        grpc_core::MultiProducerSingleConsumerQueue::Node>
        mpscq_node;
    uintptr_t scratch;
  } next_data;

  /** Bound callback. */
  grpc_iomgr_cb_func cb;

  /** Arguments to be passed to "cb". */
  void* cb_arg;

  /** Once queued, the result of the closure. Before then: scratch space */
  union {
    grpc_error_handle error;
    uintptr_t scratch;
  } error_data;

使用何种 Polling Engine 是由环境决定的,比如 Linux 上

如何确认使用的是那种 Polling Engine?

将日志级别调到 DEBUG 级别,会展示相应的信息

GRPC_VERBOSITY=DEBUG bazel run //examples/cpp/helloworld:greeter_server 

比如我在 Kernel v3.10.107-1 + libc 2.17 的环境中,执行上边的指令,显示使用的是 epoll1 而不是 epollex:

D0810 17:00:00.516485761    8302 is_epollexclusive_available.cc:86] epoll_ctl with EPOLLEXCLUSIVE | EPOLLONESHOT succeeded. This is evidence of no EPOLLEXCLUSIVE support. Not using epollex polling engine.
I0810 17:00:00.516645971    8302 ev_epollex_linux.cc:1635]   Skipping epollex because it is not supported.
I0810 17:00:00.516669710    8302 ev_epoll1_linux.cc:122]     grpc epoll fd: 3
D0810 17:00:00.516696052    8302 ev_posix.cc:173]            Using polling engine: epoll1

grpc_pollset_worker

grpc_pollset

grpc_pollset_set

grpc_fd

grpc_closure

全局只有一个 grpc_pollset 吗?

grpc_pollset_set 有父 grpc_pollset_set

grpc_pollset_set_vtable

同一个

在某个线程中执行的一个函数

learn-grpc/polling-engine/case01.cc: In function 'int main()':
learn-grpc/polling-engine/case01.cc:4:11: error: aggregate 'grpc_fd fd' has incomplete type and cannot be defined
    4 |   grpc_fd fd;
typedef struct grpc_fd grpc_fd;

Opaque Structures exposed by the polling engine

之所以说是 “Opaque“ 是因为:

image-20210810210438502

// t.cc
#include "t.h"
class T {
  int i;
};
// t.h
#ifndef T_HEADER_GUARD_
#define T_HEADER_GUARD_
class T;
#endif
// m.cc
#include "t.h"

int main() {
  T* t_ptr;
  T t;
  return 0;
}
> g++  t.cc m.cc -I ./
m.cc: In function ‘int main()’:
m.cc:5:5: error: aggregate ‘T t’ has incomplete type and cannot be defined
    5 |   T t;
      |     ^
m.cc:6:15: error: invalid use of incomplete type ‘class T’
    6 |   return t_ptr->i;
      |               ^~
In file included from m.cc:1:
t.h:3:7: note: forward declaration of ‘class T’
    3 | class T;
      |       ^

a bunch of 一堆

opaque 不透明的

recap 回顾、重新验证

closure 闭包

为什么需要自己定义 MemoryAllocator

The MemoryAllocator provides memory for EventEngine read/write operations, allowing EventEngines to seamlessly participate in the C++ ResourceQuota system. Methods like New and MakeUnique provide easy means of object construction, while Reserve and Release let EventEngines report bytes that were allocated elsewhere.

Caveat: This is not final, feedback is welcome!

A few notes based on a quick first run through

事件引擎

处理IO 、任务执行、 DNS解析。

https://github.com/grpc/proposal/pull/245

近期正在更换 https://github.com/drfloob/proposal/blob/event-engine/L82-core-event-engine.md

siloing events

当前 grpc core 内部事件管理框架 iomgr,依赖于平台有不同的实现存在。

概念

事件引擎由几部分组成

当前事件引擎:从应用正“借“线程

新事件引擎:有自己的polling和回调线程

API的作用,将gRPC 集成到外部事件引擎

arena

single_buf

CallOpSet

异步模式下,finish 的作用,只是设置finish_buf,然后调用异步操作

finish_buf 是个 CallOpSet

        *finish_buf_ptr = finish_buf;
        finish_buf->set_output_tag(tag);
        finish_buf->RecvMessage(static_cast<R*>(msg));
        finish_buf->AllowNoMessage();
        finish_buf->ClientRecvStatus(context, status);
        call->PerformOps(finish_buf);
  void PerformOps(CallOpSetInterface* ops) {
    call_hook_->PerformOpsOnCall(ops, this);
  }

call_hook_

channel_

image-20220424201510338

internal::Call* call, call_hook_

在调用 PerformOps 的时候就会用到 CallOpSet 中的类型了。

image-20220424201804448 image-20220424201825493

grpc_call_ref

Ref a call

if (RunInterceptors()) {
  ContinueFillOpsAfterInterception();
} else {
  // After the interceptors are run, ContinueFillOpsAfterInterception will
  // be run
}
    this->Op1::SetInterceptionHookPoint(&interceptor_methods_);

CallHook

真实的发送动作,发生在 Finish() 中

image-20220424204438333

具体的就是在 call->PerformOps中

image-20220424204757573 image-20220425185707565
// This call will go through interceptors and would need to
// schedule new batches, so delay completion queue shutdown
call_.cq()->RegisterAvalanching();
return interceptor_methods_.RunInterceptors();

InterceptionHookPoints

grpc_call_ref