gRPC doc-core 阅读笔记
潘忠显 / 2021-08-10
如果多个线程同时执行操作同一临界区的函数(可能是不同的函数,但是使用同一个锁),会造成其中一个线程被阻塞,这可能带来一些不良影响。
文中提出在满足一些条件的情况下,可以通过技巧来避免阻塞线程而让所有线程一直执行。这些条件包括:
- 允许函数在调用他的线程之外的线程中运行
- 可以非同步的调用(不需要立即返回结果)
符合这种条件的场景还是蛮多的,比如发送网络请求、写日志等。
给的例子中,有一点需要理解:q.push()
返回 true
是在真正第一次被调用,既元素为被放入的真正第一个元素的时候,而不是队列空了之后q.push()
就会返回 true
。后边的理解方式仍然会造成多个线程同时执行函数。
if (q.push(f)) {
// q.push returns true if it's the first thing
while (q.pop(&f)) { // modulo some extra work to avoid races
f();
}
}
而实际上,出于批量写的动机,这个封装做了更多的工作。
需要 polling engine 的原因:
grpc_endpoint
闭包的内容
/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
/** Once queued, next indicates the next queued closure; before then, scratch
* space */
union {
grpc_closure* next;
grpc_core::ManualConstructor<
grpc_core::MultiProducerSingleConsumerQueue::Node>
mpscq_node;
uintptr_t scratch;
} next_data;
/** Bound callback. */
grpc_iomgr_cb_func cb;
/** Arguments to be passed to "cb". */
void* cb_arg;
/** Once queued, the result of the closure. Before then: scratch space */
union {
grpc_error_handle error;
uintptr_t scratch;
} error_data;
使用何种 Polling Engine 是由环境决定的,比如 Linux 上
epollex
(default but requires kernel version >= 4.5),epoll1
(Ifepollex
is not available and glibc version >= 2.9)poll
(If kernel does not have epoll support)
如何确认使用的是那种 Polling Engine?
将日志级别调到 DEBUG 级别,会展示相应的信息
GRPC_VERBOSITY=DEBUG bazel run //examples/cpp/helloworld:greeter_server
比如我在 Kernel v3.10.107-1 + libc 2.17 的环境中,执行上边的指令,显示使用的是 epoll1 而不是 epollex:
D0810 17:00:00.516485761 8302 is_epollexclusive_available.cc:86] epoll_ctl with EPOLLEXCLUSIVE | EPOLLONESHOT succeeded. This is evidence of no EPOLLEXCLUSIVE support. Not using epollex polling engine.
I0810 17:00:00.516645971 8302 ev_epollex_linux.cc:1635] Skipping epollex because it is not supported.
I0810 17:00:00.516669710 8302 ev_epoll1_linux.cc:122] grpc epoll fd: 3
D0810 17:00:00.516696052 8302 ev_posix.cc:173] Using polling engine: epoll1
grpc_pollset_worker
grpc_pollset
grpc_pollset_set
grpc_fd
grpc_closure
全局只有一个 grpc_pollset 吗?
grpc_pollset_set 有父 grpc_pollset_set
grpc_pollset_set_vtable
同一个
在某个线程中执行的一个函数
learn-grpc/polling-engine/case01.cc: In function 'int main()':
learn-grpc/polling-engine/case01.cc:4:11: error: aggregate 'grpc_fd fd' has incomplete type and cannot be defined
4 | grpc_fd fd;
typedef struct grpc_fd grpc_fd;
Opaque Structures exposed by the polling engine
之所以说是 “Opaque“ 是因为:
- 这些对象的定义都是在 .cc 的源文件中,只有内部可以使用
- 外部只知道有这个符号,没有方法访问
- 可以使用该结构的指针
- 提供了
grpc_fd_create()
等函数进行创建
// t.cc
#include "t.h"
class T {
int i;
};
// t.h
#ifndef T_HEADER_GUARD_
#define T_HEADER_GUARD_
class T;
#endif
// m.cc
#include "t.h"
int main() {
T* t_ptr;
T t;
return 0;
}
> g++ t.cc m.cc -I ./
m.cc: In function ‘int main()’:
m.cc:5:5: error: aggregate ‘T t’ has incomplete type and cannot be defined
5 | T t;
| ^
m.cc:6:15: error: invalid use of incomplete type ‘class T’
6 | return t_ptr->i;
| ^~
In file included from m.cc:1:
t.h:3:7: note: forward declaration of ‘class T’
3 | class T;
| ^
a bunch of 一堆
opaque 不透明的
recap 回顾、重新验证
closure 闭包
为什么需要自己定义 MemoryAllocator
The
MemoryAllocator
provides memory for EventEngine read/write operations, allowing EventEngines to seamlessly participate in the C++ResourceQuota
system. Methods likeNew
andMakeUnique
provide easy means of object construction, whileReserve
andRelease
let EventEngines report bytes that were allocated elsewhere.
Caveat: This is not final, feedback is welcome!
A few notes based on a quick first run through
事件引擎
处理IO 、任务执行、 DNS解析。
https://github.com/grpc/proposal/pull/245
近期正在更换 https://github.com/drfloob/proposal/blob/event-engine/L82-core-event-engine.md
siloing events
当前 grpc core 内部事件管理框架 iomgr,依赖于平台有不同的实现存在。
概念
事件引擎由几部分组成
当前事件引擎:从应用正“借“线程
新事件引擎:有自己的polling和回调线程
API的作用,将gRPC 集成到外部事件引擎
arena
single_buf
CallOpSet
异步模式下,finish 的作用,只是设置finish_buf,然后调用异步操作
finish_buf 是个 CallOpSet
*finish_buf_ptr = finish_buf;
finish_buf->set_output_tag(tag);
finish_buf->RecvMessage(static_cast<R*>(msg));
finish_buf->AllowNoMessage();
finish_buf->ClientRecvStatus(context, status);
call->PerformOps(finish_buf);
void PerformOps(CallOpSetInterface* ops) {
call_hook_->PerformOpsOnCall(ops, this);
}
call_hook_
channel_
internal::Call* call, call_hook_
在调用 PerformOps 的时候就会用到 CallOpSet 中的类型了。
grpc_call_ref
Ref a call
if (RunInterceptors()) {
ContinueFillOpsAfterInterception();
} else {
// After the interceptors are run, ContinueFillOpsAfterInterception will
// be run
}
this->Op1::SetInterceptionHookPoint(&interceptor_methods_);
CallHook
真实的发送动作,发生在 Finish() 中
具体的就是在 call->PerformOps中
// This call will go through interceptors and would need to
// schedule new batches, so delay completion queue shutdown
call_.cq()->RegisterAvalanching();
return interceptor_methods_.RunInterceptors();
InterceptionHookPoints
grpc_call_ref