go-micro生成一个通用的grpc接口
创建一个基于 go-micro 的通用 gRPC 接口示例。go-micro 是一个流行的 Go 微服务框架,对 gRPC 提供了很好的支持。
以下是一个完整的通用 gRPC 接口实现:
1. 项目结构
plain
复制
grpc-generic/ ├── proto/ │ └── generic.proto # Protocol Buffers 定义 ├── server/ │ └── main.go # 服务端实现 ├── client/ │ └── main.go # 客户端示例 └── go.mod2. Protocol Buffers 定义 (proto/generic.proto)
protobuf
复制
syntax = "proto3"; option go_package = "./proto;generic"; package generic; // 通用请求消息 message GenericRequest { string service = 1; // 服务名称 string method = 2; // 方法名称 bytes payload = 3; // 序列化的请求数据 (JSON/Protobuf) map<string, string> metadata = 4; // 元数据 } // 通用响应消息 message GenericResponse { bool success = 1; // 是否成功 int32 code = 2; // 状态码 string message = 3; // 消息 bytes data = 4; // 响应数据 map<string, string> metadata = 5; // 元数据 } // 通用 gRPC 服务 service GenericService { // 通用调用方法 rpc Call(GenericRequest) returns (GenericResponse); // 流式调用(服务端流) rpc StreamCall(GenericRequest) returns (stream GenericResponse); // 双向流 rpc BidirectionalStream(stream GenericRequest) returns (stream GenericResponse); }3. 编译 Proto 文件
bash
复制
# 安装依赖 go install google.golang.org/protobuf/cmd/protoc-gen-go@latest go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest # 编译(在 proto 目录下执行) protoc --go_out=. --go_opt=paths=source_relative \ --go-grpc_out=. --go-grpc_opt=paths=source_relative \ generic.proto4. 服务端实现 (server/main.go)
go
复制
package main import ( "context" "encoding/json" "fmt" "log" "time" "go-micro.dev/v4" "go-micro.dev/v4/logger" pb "your-module/proto" ) // GenericServer 实现通用服务 type GenericServer struct{} // Call 处理通用请求 func (s *GenericServer) Call(ctx context.Context, req *pb.GenericRequest, rsp *pb.GenericResponse) error { logger.Infof("Received request: Service=%s, Method=%s", req.Service, req.Method) // 根据 service 和 method 路由到具体处理逻辑 handler, err := GetHandler(req.Service, req.Method) if err != nil { rsp.Success = false rsp.Code = 404 rsp.Message = err.Error() return nil } // 执行处理 result, err := handler(ctx, req.Payload, req.Metadata) if err != nil { rsp.Success = false rsp.Code = 500 rsp.Message = err.Error() return nil } // 序列化响应 data, err := json.Marshal(result) if err != nil { rsp.Success = false rsp.Code = 500 rsp.Message = "marshal error: " + err.Error() return nil } rsp.Success = true rsp.Code = 200 rsp.Message = "success" rsp.Data = data rsp.Metadata = map[string]string{ "timestamp": time.Now().Format(time.RFC3339), "server": "go-micro-generic", } return nil } // HandlerFunc 定义处理函数类型 type HandlerFunc func(ctx context.Context, payload []byte, metadata map[string]string) (interface{}, error) // 处理器注册表 var handlers = make(map[string]map[string]HandlerFunc) // RegisterHandler 注册处理器 func RegisterHandler(service, method string, handler HandlerFunc) { if handlers[service] == nil { handlers[service] = make(map[string]HandlerFunc) } handlers[service][method] = handler } // GetHandler 获取处理器 func GetHandler(service, method string) (HandlerFunc, error) { if svc, ok := handlers[service]; ok { if h, ok := svc[method]; ok { return h, nil } } return nil, fmt.Errorf("handler not found: %s.%s", service, method) } // ============ 业务逻辑示例 ============ // UserRequest 用户请求 type UserRequest struct { ID int64 `json:"id"` Name string `json:"name"` } // UserResponse 用户响应 type UserResponse struct { ID int64 `json:"id"` Name string `json:"name"` CreatedAt string `json:"created_at"` } func init() { // 注册用户服务处理器 RegisterHandler("UserService", "GetUser", func(ctx context.Context, payload []byte, metadata map[string]string) (interface{}, error) { var req UserRequest if err := json.Unmarshal(payload, &req); err != nil { return nil, err } // 模拟数据库查询 return UserResponse{ ID: req.ID, Name: req.Name, CreatedAt: time.Now().Format(time.RFC3339), }, nil }) // 注册订单服务处理器 RegisterHandler("OrderService", "CreateOrder", func(ctx context.Context, payload []byte, metadata map[string]string) (interface{}, error) { var order map[string]interface{} if err := json.Unmarshal(payload, &order); err != nil { return nil, err } order["order_id"] = fmt.Sprintf("ORD-%d", time.Now().Unix()) order["status"] = "created" return order, nil }) } func main() { // 创建服务 service := micro.NewService( micro.Name("generic.grpc.server"), micro.Version("latest"), micro.Address(":8080"), ) // 初始化 service.Init() // 注册处理器 pb.RegisterGenericServiceHandler(service.Server(), &GenericServer{}) // 启动服务 logger.Info("Starting generic gRPC server on :8080") if err := service.Run(); err != nil { log.Fatal(err) } }5. 客户端实现 (client/main.go)
go
复制
package main import ( "context" "encoding/json" "fmt" "log" "time" "go-micro.dev/v4" "go-micro.dev/v4/client" "google.golang.org/grpc" pb "your-module/proto" ) func main() { // 创建服务 service := micro.NewService( micro.Name("generic.grpc.client"), ) service.Init() // 创建 gRPC 客户端 cli := service.Client() // 创建请求 req := &pb.GenericRequest{ Service: "UserService", Method: "GetUser", Payload: mustMarshal(map[string]interface{}{ "id": 123, "name": "张三", }), Metadata: map[string]string{ "request_id": "req-001", "client": "go-micro-client", }, } // 设置超时 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 调用服务 rsp := &pb.GenericResponse{} if err := cli.Call(ctx, cli.NewRequest("generic.grpc.server", "GenericService.Call", req), rsp); err != nil { log.Fatalf("Call failed: %v", err) } // 处理响应 fmt.Printf("Success: %v\n", rsp.Success) fmt.Printf("Code: %d\n", rsp.Code) fmt.Printf("Message: %s\n", rsp.Message) fmt.Printf("Data: %s\n", string(rsp.Data)) fmt.Printf("Metadata: %v\n", rsp.Metadata) } func mustMarshal(v interface{}) []byte { b, _ := json.Marshal(v) return b } // ============ 原生 gRPC 客户端示例 ============ func nativeGRPCClient() { // 连接到服务端 conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewGenericServiceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() r, err := c.Call(ctx, &pb.GenericRequest{ Service: "OrderService", Method: "CreateOrder", Payload: mustMarshal(map[string]interface{}{ "product": "iPhone 15", "price": 6999.00, "count": 1, }), }) if err != nil { log.Fatalf("could not call: %v", err) } fmt.Printf("Response: %+v\n", r) }6. Go Mod 配置 (go.mod)
go
复制
module your-module go 1.21 require ( go-micro.dev/v4 v4.10.2 google.golang.org/grpc v1.59.0 google.golang.org/protobuf v1.31.0 )7. 高级特性:中间件与拦截器
go
复制
// 服务端拦截器(认证、日志、限流) func serverInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { // 1. 认证检查 if err := authenticate(ctx); err != nil { return nil, err } // 2. 记录请求日志 start := time.Now() resp, err := handler(ctx, req) logger.Infof("Method: %s, Duration: %v, Error: %v", info.FullMethod, time.Since(start), err) return resp, err } // 客户端拦截器(重试、熔断、链路追踪) func clientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { // 添加请求头 md, _ := metadata.FromOutgoingContext(ctx) md.Append("x-request-id", generateRequestID()) ctx = metadata.NewOutgoingContext(ctx, md) // 重试逻辑 var err error for i := 0; i < 3; i++ { err = invoker(ctx, method, req, reply, cc, opts...) if err == nil { return nil } time.Sleep(time.Second * time.Duration(i+1)) } return err }8. 服务发现与负载均衡
go
复制
// 使用 etcd 作为服务发现 import "go-micro.dev/v4/registry/etcd" func main() { // etcd 注册中心 reg := etcd.NewRegistry(registry.Addrs("localhost:2379")) service := micro.NewService( micro.Name("generic.grpc.server"), micro.Registry(reg), micro.Version("v1.0.0"), // 负载均衡策略 micro.Client(client.NewClient( client.Selector(roundrobin.NewSelector()), )), ) // ... }关键特性总结
表格
| 特性 | 实现方式 |
|---|---|
| 通用接口 | 使用GenericRequest/Response封装所有请求 |
| 动态路由 | 通过service+method字段路由到具体处理器 |
| 可扩展性 | 注册表模式,动态添加业务处理器 |
| 传输格式 | 默认 JSON,可替换为 Protobuf/MsgPack |
| 中间件 | gRPC 拦截器实现认证、日志、限流 |
| 服务治理 | 集成 etcd 服务发现、负载均衡 |
这个设计允许你在不修改 proto 文件的情况下,通过动态注册处理器来扩展业务功能,非常适合需要快速迭代的微服务场景。需要我详细解释某个部分吗?
