今天要修改下框架方面的东西,把接口请求返回的默认上报,过滤掉外部拼凑的url,想得是把router的map传进去,未命中的就不做上报,看到grpc的时候发现其并没有像http一样维护一个grpc的方法map,对grpc的原理之前有一些了解,今天希望能更进一步详细,了解下grpc注册和发现相关的逻辑。
grpc的服务端注册
- 首先生成srv grpcserver(google的官方sever)
- 通过pb.go生成的Register**Server,绑定实现了pb.go定义的接口的服务类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49//绑定实现类到 生成的server里,同时注册serviceDesc到
func RegisterNewsPoolServer(s *grpc.Server, srv NewsPoolServer) {
s.RegisterService(&_NewsPool_serviceDesc, srv)
}
//serviceDesc 如下也是服务端收到请求能够打到具体方法的核心配置,相当于httpserver里面的路由信息
var _NewsPool_serviceDesc = grpc.ServiceDesc{
ServiceName: "content.NewsPool",
HandlerType: (*NewsPoolServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "GetNewsContent",
Handler: _NewsPool_GetNewsContent_Handler,
},
{
MethodName: "UpdateNewsContent",
Handler: _NewsPool_UpdateNewsContent_Handler,
},
{
MethodName: "DelNewsContent",
Handler: _NewsPool_DelNewsContent_Handler,
},
{
MethodName: "GetVideoContent",
Handler: _NewsPool_GetVideoContent_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "newspool.proto",
}
//其中一个handler定义如下
func _NewsPool_GetNewsContent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(NewsPoolRequest)
if err := dec(in); err != nil {
return nil, err
}
//interceptor 直接断言绑定的srv结构体调用其函数
if interceptor == nil {
return srv.(NewsPoolServer).GetNewsContent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/content.NewsPool/GetNewsContent",
}
//interceptor 把具体srv处理函数封装成handler传入interceptor中,可以在interceptor中做请求的预处理和其它操作,我们框架里面的熔断限流和一些统一操作也是在这里进行处理的
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(NewsPoolServer).GetNewsContent(ctx, req.(*NewsPoolRequest))
}
return interceptor(ctx, in, info, handler)
} - srv的实现绑定和配置绑定ok后,启动srv.server()会把服务启动接受请求启动协程进行处理。我们看下具实现:我们来简单分析一下 下面是部分代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
s.serve = true
//server的lisMap如果是空就直接关闭lis
if s.lis == nil {
// Serve called after Stop or GracefulStop.
s.mu.Unlock()
lis.Close()
return ErrServerStopped
}
s.serveWG.Add(1)
defer func() {
s.serveWG.Done()
select {
// Stop or GracefulStop called; block until done and return nil.
case <-s.quit:
<-s.done
default:
}
}()
ls := &listenSocket{Listener: lis}
s.lis[ls] = true
if channelz.IsOn() {
ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
}
s.mu.Unlock()
defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[ls] {
ls.Close()
delete(s.lis, ls)
}
s.mu.Unlock()
}()
var tempDelay time.Duration // how long to sleep on accept failure
for {
//接受连接
rawConn, err := lis.Accept()
tempDelay = 0
// Start a new goroutine to deal with rawConn so we don't stall this Accept
// loop goroutine.
//
// Make sure we account for the goroutine so GracefulStop doesn't nil out
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
//开启协程处理连接流
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
} - 最后一步请求路由到相应的hanlder,例如 上面的:Handler: _NewsPool_GetVideoContent_Handler (依然只截取部分代码)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
//获取要调用的方法
sm := stream.Method()
if sm != "" && sm[0] == '/' {
sm = sm[1:]
}
pos := strings.LastIndex(sm, "/")
//省略了些错误处理
...
service := sm[:pos]
method := sm[pos+1:]
//获取到回应的md,在processUnaryRPC中调用md.method 就是注册的_NewsPool_GetVideoContent_Handler
if srv, ok := s.m[service]; ok {
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.sd[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
// Unknown service, or known server unknown method.
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("unknown service %v", service)
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
}
//进入到这个函数就可以看到,handler就是我们实现的getNewsContent函数,其实如果定义了interceptor会在interceptor中执行hander,做一些通一的操作。
func _NewsPool_GetNewsContent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(NewsPoolRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(NewsPoolServer).GetNewsContent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/content.NewsPool/GetNewsContent",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(NewsPoolServer).GetNewsContent(ctx, req.(*NewsPoolRequest))
}
return interceptor(ctx, in, info, handler)
}
grpc的服务注册,启动,监听和路由到对应的实现就完成了,下一篇看下grpclient的建立,怎样把一个grpc请求转化为具体的调用,这样就把grpc的实现完整的了解啦