gRPC 与 Protobuf 实战指南
引言
gRPC 是 Google 开源的高性能 RPC 框架,而 Protobuf(Protocol Buffers)则是其默认的序列化协议。两者结合带来了高性能、跨语言、契约优先的现代微服务通信方案。
传统的 REST API 使用 JSON 或 XML 作为数据格式,存在以下问题:
体积大:JSON 文本格式冗余
解析慢:需要解析字符串
无强类型:字段变化不易发现
代码生成弱:缺乏好的工具链
gRPC + Protobuf 通过二进制格式和代码生成很好地解决了这些问题。本文将深入探讨 Protobuf 语法、gRPC 服务开发、以及生产环境中的最佳实践。
一、Protobuf 语法详解
1.1 消息定义基础
Protobuf 的核心是.proto文件,它定义消息的结构:
syntax = "proto3"; // 指定 protobuf 版本 package user; // 包名,用于避免命名冲突 // 定义用户消息 message User { string name = 1; // 字段名 = 字段编号 int32 id = 2; // 编号必须唯一,用于二进制编码 string email = 3; bool active = 4; int64 created_at = 5; }字段编号规则:
1-15:常用字段,使用一个字节编码
16-2047:非常用字段,使用两个字节编码
19000-19999:保留编号,系统使用
建议将 1-15 分配给最常用的字段
1.2 标量数据类型
Protobuf 支持丰富的数据类型:
message TypesDemo { // 整数类型 int32 var_int32 = 1; // 变长有符号整数 int64 var_int64 = 2; // 变长有符号长整数 uint32 var_uint32 = 3; // 变长无符号整数 uint64 var_uint64 = 4; // 变长无符号长整数 sint32 var_sint32 = 5; // 变长有符号整数(负数效率更高) sint64 var_sint64 = 6; // 变长有符号长整数 // 固定长度类型 fixed32 fixed32 = 7; // 固定4字节无符号整数 fixed64 fixed64 = 8; // 固定8字节无符号整数 sfixed32 sfixed32 = 9; // 固定4字节有符号整数 sfixed64 sfixed64 = 10; // 固定8字节有符号整数 // 浮点数类型 float float_val = 11; // 32位浮点数 double double_val = 12; // 64位浮点数 // 布尔和字符串 bool bool_val = 13; string string_val = 14; bytes bytes_val = 15; }1.3 嵌套与组合
// 嵌套消息 message User { message Address { string street = 1; string city = 2; string country = 3; } string name = 1; Address address = 2; // 使用嵌套消息 repeated Phone phones = 3; // 数组/列表 } // 枚举类型 message Order { enum Status { UNKNOWN = 0; // 枚举必须从 0 开始 PENDING = 1; PAID = 2; SHIPPED = 3; DELIVERED = 4; CANCELLED = 5; } string order_id = 1; Status status = 2; User buyer = 3; repeated Item items = 4; }1.4 Map 类型
message Product { // 键值对映射 map<string, string> attributes = 1; map<int64, User> related_users = 2; }1.5 oneof 联合类型
当一个字段可以是多种类型之一时,使用oneof:
message Response { oneof result { User user = 1; Order order = 2; string error = 3; } int64 timestamp = 4; }二、proto 文件编译与代码生成
2.1 安装 protoc
Windows 环境安装 protoc:
# 下载 protoc(从 GitHub releases) curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v25.1/protoc-25.1-win64.zip unzip protoc-25.1-win64.zip -d $HOME/.local # 设置 PATH export PATH="$PATH:$HOME/.local/bin" # 验证安装 protoc --version
2.2 安装 Go 插件
# 安装 protoc-gen-go(用于生成 .pb.go 文件) go install google.golang.org/protobuf/cmd/protoc-gen-go@latest # 安装 protoc-gen-go-grpc(用于生成 gRPC 服务代码) go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest # 设置 PATH export PATH="$PATH:$(go env GOPATH)/bin"
2.3 编译 proto 文件
目录结构:
project/ ├── proto/ │ ├── user.proto │ └── order.proto ├── generated/ └── main.go
编译命令:
protoc \ --go_out=generated \ --go_opt=paths=source_relative \ --go-grpc_out=generated \ --go-grpc_opt=paths=source_relative \ proto/*.proto
生成的代码结构:
// user.pb.go - 消息类型定义 type User struct { Name string Id int32 Email string Active bool CreatedAt int64 // ... 序列化/反序列化方法 } // user_grpc.pb.go - gRPC 服务定义 type UserServiceClient interface { GetUser(ctx context.Context, in *GetUserRequest, opts ...grpc.CallOption) (*User, error) ListUsers(ctx context.Context, in *ListUsersRequest, opts ...grpc.CallOption) (*ListUsersResponse, error) // ... }2.4 完整 proto 示例
创建proto/user.proto:
syntax = "proto3"; package user; option go_package = "github.com/example/project/gen/user;user"; import "google/protobuf/timestamp.proto"; // 用户服务定义 service UserService { // 获取单个用户 rpc GetUser(GetUserRequest) returns (User); // 列出用户(支持分页) rpc ListUsers(ListUsersRequest) returns (ListUsersResponse); // 创建用户 rpc CreateUser(CreateUserRequest) returns (User); // 更新用户 rpc UpdateUser(UpdateUserRequest) returns (User); // 删除用户 rpc DeleteUser(DeleteUserRequest) returns (Empty); // 双向流示例:批量操作 rpc BatchProcessUsers(stream User) returns (stream OperationResult); } // 用户消息定义 message User { int32 id = 1; string name = 2; string email = 3; bool active = 4; google.protobuf.Timestamp created_at = 5; repeated string roles = 6; } // 请求消息定义 message GetUserRequest { int32 id = 1; } message ListUsersRequest { int32 page = 1; int32 page_size = 2; string search = 3; } message ListUsersResponse { repeated User users = 1; int32 total = 2; int32 page = 3; int32 page_size = 4; } message CreateUserRequest { string name = 1; string email = 2; repeated string roles = 3; } message UpdateUserRequest { int32 id = 1; string name = 2; string email = 3; bool active = 4; } message DeleteUserRequest { int32 id = 1; } // 通用响应 message Empty {} // 批量操作结果 message OperationResult { int32 id = 1; bool success = 2; string message = 3; }三、gRPC 服务端开发
3.1 项目结构
grpc-demo/ ├── proto/ │ └── user.proto ├── gen/ │ ├── user.pb.go │ └── user_grpc.pb.go ├── server/ │ └── main.go ├── client/ │ └── main.go └── go.mod
3.2 服务端实现
package main import ( "context" "fmt" "io" "log" "net" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" pb "github.com/example/grpc-demo/gen/user" "google.golang.org/protobuf/types/known/timestamppb" ) type UserServer struct { pb.UnimplementedUserServiceServer // 数据库模拟 users map[int32]*pb.User nextID int32 } func NewUserServer() *UserServer { return &UserServer{ users: make(map[int32]*pb.User), nextID: 1, } } func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) { // 从 metadata 获取调用者信息 if md, ok := metadata.FromIncomingContext(ctx); ok { log.Printf("GetUser called by: %v", md.Get("client-id")) } user, ok := s.users[req.Id] if !ok { return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id) } return user, nil } func (s *UserServer) ListUsers(ctx context.Context, req *pb.ListUsersRequest) (*pb.ListUsersResponse, error) { var users []*pb.User for _, user := range s.users { // 简单搜索过滤 if req.Search != "" { if user.Name != req.Search && user.Email != req.Search { continue } } users = append(users, user) } // 分页 start := (req.Page - 1) * req.PageSize end := start + req.PageSize if start >= len(users) { users = []*pb.User{} } else if end > len(users) { users = users[start:] } else { users = users[start:end] } return &pb.ListUsersResponse{ Users: users, Total: int32(len(users)), Page: req.Page, PageSize: req.PageSize, }, nil } func (s *UserServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) { // 参数验证 if req.Name == "" { return nil, status.Error(codes.InvalidArgument, "用户名为必填项") } if req.Email == "" { return nil, status.Error(codes.InvalidArgument, "邮箱为必填项") } user := &pb.User{ Id: s.nextID, Name: req.Name, Email: req.Email, Active: true, Roles: req.Roles, CreatedAt: timestamppb.Now(), } s.users[s.nextID] = user s.nextID++ log.Printf("创建用户: ID=%d, Name=%s", user.Id, user.Name) return user, nil } func (s *UserServer) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*pb.User, error) { user, ok := s.users[req.Id] if !ok { return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id) } if req.Name != "" { user.Name = req.Name } if req.Email != "" { user.Email = req.Email } user.Active = req.Active log.Printf("更新用户: ID=%d", user.Id) return user, nil } func (s *UserServer) DeleteUser(ctx context.Context, req *pb.DeleteUserRequest) (*pb.Empty, error) { if _, ok := s.users[req.Id]; !ok { return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id) } delete(s.users, req.Id) log.Printf("删除用户: ID=%d", req.Id) return &pb.Empty{}, nil } // 双向流 RPC 实现 func (s *UserServer) BatchProcessUsers(stream pb.UserService_BatchProcessUsersServer) error { for { user, err := stream.Recv() if err == io.EOF { // 客户端发送完毕,发送响应 return nil } if err != nil { return err } log.Printf("处理用户: ID=%d, Name=%s", user.Id, user.Name) // 模拟处理 result := &pb.OperationResult{ Id: user.Id, Success: true, Message: fmt.Sprintf("用户 %s 处理成功", user.Name), } // 发送响应 if err := stream.Send(result); err != nil { return err } } }3.3 服务启动与注册
func main() { // 创建监听 lis, err := net.Listen("tcp", ":50051") if err != nil { log.Fatalf("监听端口失败: %v", err) } // 创建 gRPC 服务器(可以添加选项) opts := []grpc.ServerOption{ grpc.UnaryInterceptor(unaryServerInterceptor), grpc.StreamInterceptor(streamServerInterceptor), } server := grpc.NewServer(opts...) // 注册服务 userServer := NewUserServer() pb.RegisterUserServiceServer(server, userServer) log.Printf("gRPC 服务启动,监听端口 :50051") // 启动服务 if err := server.Serve(lis); err != nil { log.Fatalf("服务启动失败: %v", err) } } // 单元拦截器示例 func unaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { log.Printf("调用方法: %s", info.FullMethod) // 前置处理 start := time.Now() // 调用实际方法 resp, err := handler(ctx, req) // 后置处理 log.Printf("方法 %s 耗时: %v", info.FullMethod, time.Since(start)) return resp, err } // 流拦截器示例 func streamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { log.Printf("流方法调用: %s", info.FullMethod) return handler(srv, ss) }四、gRPC 客户端开发
4.1 简单客户端
package main import ( "context" "log" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" pb "github.com/example/grpc-demo/gen/user" ) func main() { // 连接 gRPC 服务器 conn, err := grpc.Dial( "localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()), // 测试环境不使用 TLS grpc.WithBlock(), // 阻塞直到连接成功 grpc.WithTimeout(time.Second*10), // 超时设置 ) if err != nil { log.Fatalf("连接服务器失败: %v", err) } defer conn.Close() // 创建客户端 client := pb.NewUserServiceClient(conn) // 调用 GetUser ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1}) if err != nil { log.Printf("获取用户失败: %v", err) } else { log.Printf("获取用户成功: %+v", user) } }4.2 带认证的客户端
// 认证元数据 type Auth struct { Token string } func (a *Auth) GetRequestMetadata(ctx context.Context, urls ...string) (map[string]string, error) { return map[string]string{ "authorization": "Bearer " + a.Token, }, nil } func (a *Auth) RequireTransportSecurity() bool { return false // 测试环境设为 false } // 认证连接示例 func authenticatedClient() (*grpc.ClientConn, error) { creds := &Auth{Token: "your-jwt-token"} return grpc.Dial( "localhost:50051", grpc.WithPerRPCCredentials(creds), ) }4.3 客户端流调用
// 批量创建用户 func batchCreateUsers(client pb.UserServiceClient, users []*pb.User) error { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() stream, err := client.BatchProcessUsers(ctx) if err != nil { return err } // 发送请求流 for _, user := range users { if err := stream.Send(user); err != nil { return err } } // 关闭发送流并接收响应 reply, err := stream.CloseAndRecv() if err != nil { return err } log.Printf("批量处理完成: %+v", reply) return nil }4.4 双向流调用
// 双向流实时通信 func bidirectionalStream(client pb.UserServiceClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := client.BatchProcessUsers(ctx) if err != nil { return err } // 使用两个 goroutine 分别处理发送和接收 var wg sync.WaitGroup wg.Add(2) // 发送协程 go func() { defer wg.Done() for i := 0; i < 10; i++ { user := &pb.User{ Id: int32(i), Name: fmt.Sprintf("User%d", i), } if err := stream.Send(user); err != nil { log.Printf("发送失败: %v", err) return } time.Sleep(100 * time.Millisecond) } stream.CloseSend() }() // 接收协程 go func() { defer wg.Done() for { result, err := stream.Recv() if err == io.EOF { return } if err != nil { log.Printf("接收失败: %v", err) return } log.Printf("收到结果: ID=%d, Success=%t, Message=%s", result.Id, result.Success, result.Message) } }() wg.Wait() return nil }五、元数据与拦截器
5.1 元数据(Metadata)
gRPC 使用 Metadata 在请求中传递额外信息:
// 定义可选的消息头 message ExtraInfo { string trace_id = 1; string span_id = 2; map<string, string> tags = 3; }服务端读取元数据:
func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) { // 读取元数据 md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, status.Error(codes.Internal, "无法获取元数据") } // 获取特定字段 traceID := md.Get("x-trace-id") if len(traceID) > 0 { log.Printf("Trace ID: %s", traceID[0]) } // 处理请求... return s.users[req.Id], nil }客户端发送元数据:
func callWithMetadata(client pb.UserServiceClient) { // 创建元数据 md := metadata.Pairs( "x-trace-id", "abc123", "x-client-version", "1.0.0", ) // 创建带元数据的上下文 ctx := metadata.NewOutgoingContext(context.Background(), md) // 调用 client.GetUser(ctx, &pb.GetUserRequest{Id: 1}) }5.2 拦截器实现
Unary 拦截器:
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { log.Printf("==> 收到请求: %s", info.FullMethod) // 添加追踪 md, _ := metadata.FromIncomingContext(ctx) traceID := md.Get("x-trace-id") if len(traceID) > 0 { ctx = context.WithValue(ctx, "trace_id", traceID[0]) } // 调用实际处理函数 resp, err := handler(ctx, req) if err != nil { log.Printf("<== 请求失败: %s, error: %v", info.FullMethod, err) } else { log.Printf("<== 请求成功: %s", info.FullMethod) } return resp, err }Stream 拦截器:
func streamLoggingInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { log.Printf("==> 收到流请求: %s, IsServerStream: %v", info.FullMethod, info.IsServerStream) // 包装原始流以添加日志功能 wrapped := &loggingServerStream{ServerStream: ss} return handler(srv, wrapped) } type loggingServerStream struct { grpc.ServerStream } func (x *loggingServerStream) SendMsg(m interface{}) error { log.Printf("==> 发送消息: %T", m) return x.ServerStream.SendMsg(m) } func (x *loggingServerStream) RecvMsg(m interface{}) error { err := x.ServerStream.RecvMsg(m) if err == nil { log.Printf("<== 收到消息: %T", m) } return err }5.3 认证拦截器
// 简单 Token 认证 func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { // 白名单:不需要认证的方法 whitelist := map[string]bool{ "/user.UserService/GetUser": true, } if whitelist[info.FullMethod] { return handler(ctx, req) } // 检查 Token md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, status.Error(codes.Unauthenticated, "缺少元数据") } tokens := md.Get("authorization") if len(tokens) == 0 { return nil, status.Error(codes.Unauthenticated, "缺少认证 Token") } token := strings.TrimPrefix(tokens[0], "Bearer ") if !validateToken(token) { return nil, status.Error(codes.Unauthenticated, "无效的 Token") } return handler(ctx, req) } func validateToken(token string) bool { // 实际应该验证 JWT 或其他 token return token == "valid-token" }六、双向流 RPC 详解
6.1 四种 RPC 类型
gRPC 支持四种 RPC 类型:
1. 一元 RPC (Unary RPC) 客户端 → 服务端 ClientStream → 服务器处理 → ClientStream 2. 客户端流 RPC (Client Streaming RPC) ClientStream → 服务器处理 → ClientStream 客户端发送多个请求,服务器返回一个响应 3. 服务端流 RPC (Server Streaming RPC) ClientStream → 服务器处理 → ClientStream 客户端发送一个请求,服务器返回多个响应 4. 双向流 RPC (Bidirectional Streaming RPC) ClientStream ↔ 服务器处理 ↔ ClientStream 双方都可以发送多个消息
6.2 双向流聊天服务示例
定义 proto:
service ChatService { // 双向流聊天 rpc Chat(stream ChatMessage) returns (stream ChatMessage); } message ChatMessage { string sender = 1; string content = 2; int64 timestamp = 3; }服务端实现:
type ChatServer struct { pb.UnimplementedChatServiceServer clients map[string]pb.ChatService_ChatServer mu sync.Mutex } func (s *ChatServer) Chat(stream pb.ChatService_ChatServer) error { var sender string // 等待第一个消息(用于注册) firstMsg, err := stream.Recv() if err != nil { return err } sender = firstMsg.Sender // 注册客户端 s.mu.Lock() s.clients[sender] = stream s.mu.Unlock() defer func() { // 注销客户端 s.mu.Lock() delete(s.clients, sender) s.mu.Unlock() }() // 启动接收协程 errChan := make(chan error, 1) go func() { for { msg, err := stream.Recv() if err == io.EOF { errChan <- nil return } if err != nil { errChan <- err return } // 广播消息给所有客户端 s.broadcast(msg) } }() // 等待错误 select { case err := <-errChan: return err } } func (s *ChatServer) broadcast(msg *pb.ChatMessage) { s.mu.Lock() defer s.mu.Unlock() for sender, stream := range s.clients { if sender != msg.Sender { // 不发给自己 stream.Send(msg) } } }七、实战案例:微服务通信框架
7.1 项目架构
microservices/ ├── proto/ │ ├── user.proto │ ├── order.proto │ └── product.proto ├── pkg/ │ ├── grpc/ │ │ ├── client.go │ │ ├── server.go │ │ └── interceptor.go │ └── discovery/ │ └── consul.go ├── services/ │ ├── user-service/ │ ├── order-service/ │ └── product-service/ └── go.mod
7.1 通用 gRPC 客户端封装
package grpc import ( "context" "crypto/tls" "fmt" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/resolver" ) // ClientConfig 客户端配置 type ClientConfig struct { Name string Address string Timeout time.Duration TLSCert string Token string MaxRecvMsg int MaxSendMsg int } // Dial 创建 gRPC 连接 func Dial(ctx context.Context, cfg ClientConfig) (*grpc.ClientConn, error) { opts := []grpc.DialOption{ // 超时设置 grpc.WithBlock(), grpc.WithTimeout(cfg.Timeout), } // 凭证设置 if cfg.TLSCert != "" { creds, err := credentials.NewClientTLSFromFile(cfg.TLSCert, "") if err != nil { return nil, fmt.Errorf("加载 TLS 证书失败: %w", err) } opts = append(opts, grpc.WithTransportCredentials(creds)) } else { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } // Token 认证 if cfg.Token != "" { opts = append(opts, grpc.WithPerRPCCredentials(&tokenAuth{Token: cfg.Token})) } // 消息大小限制 if cfg.MaxRecvMsg > 0 { opts = append(opts, grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsg), )) } if cfg.MaxSendMsg > 0 { opts = append(opts, grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(cfg.MaxSendMsg), )) } return grpc.DialContext(ctx, cfg.Address, opts...) } // tokenAuth 实现 Token 认证 type tokenAuth struct { Token string } func (t *tokenAuth) GetRequestMetadata(ctx context.Context, urls ...string) (map[string]string, error) { return map[string]string{ "authorization": "Bearer " + t.Token, }, nil } func (t *tokenAuth) RequireTransportSecurity() bool { return true }7.2 通用服务端封装
package grpc import ( "context" "crypto/tls" "fmt" "net" "time" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/reflection" ) // ServerConfig 服务端配置 type ServerConfig struct { Port int TLSCert string TLSKey string MaxRecvMsg int MaxSendMsg int Interceptors []grpc.UnaryServerInterceptor StreamInts []grpc.StreamServerInterceptor } // Server 通用 gRPC 服务器 type Server struct { cfg ServerConfig server *grpc.Server lis net.Listener } // NewServer 创建 gRPC 服务器 func NewServer(cfg ServerConfig) (*Server, error) { opts := []grpc.ServerOption{ // 消息大小限制 grpc.MaxRecvMsgSize(cfg.MaxRecvMsg), grpc.MaxSendMsgSize(cfg.MaxSendMsg), // Keepalive 设置 grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionIdle: 5 * time.Minute, MaxConnectionAge: 2 * time.Hour, MaxConnectionAgeGrace: 30 * time.Second, Time: 5 * time.Minute, Timeout: 30 * time.Second, }), // 拦截器 grpc.ChainUnaryInterceptor(cfg.Interceptors...), grpc.ChainStreamInterceptor(cfg.StreamInts...), } // TLS 配置 if cfg.TLSCert != "" && cfg.TLSKey != "" { creds, err := credentials.NewServerTLSFromFile(cfg.TLSCert, cfg.TLSKey) if err != nil { return nil, fmt.Errorf("加载 TLS 证书失败: %w", err) } opts = append(opts, grpc.Creds(creds)) } server := grpc.NewServer(opts...) // 注册反射服务(用于 grpcurl 等工具调试) reflection.Register(server) return &Server{ cfg: cfg, server: server, }, nil } // RegisterService 注册 gRPC 服务 func (s *Server) RegisterService(registerFunc func(*grpc.Server)) { registerFunc(s.server) } // Start 启动服务器 func (s *Server) Start(ctx context.Context) error { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.cfg.Port)) if err != nil { return fmt.Errorf("监听端口失败: %w", err) } s.lis = lis errCh := make(chan error, 1) go func() { errCh <- s.server.Serve(lis) }() select { case err := <-errCh: return err case <-ctx.Done(): s.server.GracefulStop() return nil } } // Stop 停止服务器 func (s *Server) Stop() { s.server.GracefulStop() }7.3 服务发现集成
package discovery import ( "context" "fmt" "github.com/hashicorp/consul/api" "google.golang.org/grpc/resolver" ) // ConsulResolver Consul 服务发现解析器 type ConsulResolver struct { consulClient *api.Client serviceName string scheme string } // NewConsulResolver 创建 Consul 解析器 func NewConsulResolver(consulAddr, serviceName string) (*ConsulResolver, error) { config := api.DefaultConfig() config.Address = consulAddr client, err := api.NewClient(config) if err != nil { return nil, fmt.Errorf("创建 Consul 客户端失败: %w", err) } return &ConsulResolver{ consulClient: client, serviceName: serviceName, scheme: "consul", }, nil } // Build 实现 resolver.Builder 接口 func (r *ConsulResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { return &consulResolver{ consulClient: r.consulClient, serviceName: r.serviceName, cc: cc, }, nil } // Scheme 返回解析器 scheme func (r *ConsulResolver) Scheme() string { return r.scheme } type consulResolver struct { consulClient *api.Client serviceName string cc resolver.ClientConn } func (r *consulResolver) ResolveNow(options resolver.ResolveNowOptions) { r.resolve() } func (r *consulResolver) resolve() { services, _, err := r.consulClient.Health().Service(r.serviceName, "", true, nil) if err != nil { r.cc.ReportError(err) return } var addrs []resolver.Address for _, svc := range services { addrs = append(addrs, resolver.Address{ Addr: fmt.Sprintf("%s:%d", svc.Service.Address, svc.Service.Port), }) } r.cc.UpdateState(resolver.State{Addresses: addrs}) } func (r *consulResolver) Close() {} // Register 注册服务到 Consul func Register(ctx context.Context, consulAddr, serviceName, addr string, port int) error { config := api.DefaultConfig() config.Address = consulAddr client, err := api.NewClient(config) if err != nil { return err } reg := &api.AgentServiceRegistration{ ID: fmt.Sprintf("%s-%s", serviceName, addr), Name: serviceName, Port: port, Address: addr, Check: &api.AgentServiceCheck{ GRPC: fmt.Sprintf("%s:%d", addr, port), Interval: "10s", Timeout: "5s", DeregisterCriticalServiceAfter: "30s", }, } return client.Agent().ServiceRegister(reg) }总结
gRPC + Protobuf 是一套成熟的微服务通信解决方案:
Protobuf 优势:体积小、解析快、类型安全、向前兼容
gRPC 优势:高性能、双工流、代码生成、协议统一
四种 RPC 类型:一元、流式客户端、流式服务端、双向流
元数据与拦截器:实现认证、日志、追踪等横切关注点
服务发现:通过 resolver 实现负载均衡和服务发现
最佳实践:
proto 文件集中管理,统一版本
使用拦截器实现横切关注点
生产环境务必启用 TLS
合理设置消息大小限制
使用流式 API 处理大数据量场景
