sponge/internal/server/grpc.go

160 lines
3.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package server
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"time"
"github.com/zhufuyi/sponge/config"
"github.com/zhufuyi/sponge/internal/service"
"github.com/zhufuyi/sponge/pkg/app"
"github.com/zhufuyi/sponge/pkg/grpc/interceptor"
"github.com/zhufuyi/sponge/pkg/grpc/metrics"
"github.com/zhufuyi/sponge/pkg/logger"
"github.com/zhufuyi/sponge/pkg/registry"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"google.golang.org/grpc"
)
var _ app.IServer = (*grpcServer)(nil)
type grpcServer struct {
addr string
server *grpc.Server
listen net.Listener
metricsHTTPServer *http.Server
goRunPromHTTPServer func() error
iRegistry registry.Registry
serviceInstance *registry.ServiceInstance
}
// Start grpc service
func (s *grpcServer) Start() error {
if s.iRegistry != nil {
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) //nolint
if err := s.iRegistry.Register(ctx, s.serviceInstance); err != nil {
return err
}
}
if s.goRunPromHTTPServer != nil {
if err := s.goRunPromHTTPServer(); err != nil {
return err
}
}
if err := s.server.Serve(s.listen); err != nil { // block
return err
}
return nil
}
// Stop grpc service
func (s *grpcServer) Stop() error {
if s.iRegistry != nil {
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) //nolint
if err := s.iRegistry.Deregister(ctx, s.serviceInstance); err != nil {
return err
}
}
s.server.GracefulStop()
if s.metricsHTTPServer != nil {
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) //nolint
if err := s.metricsHTTPServer.Shutdown(ctx); err != nil {
return err
}
}
return nil
}
// String comment
func (s *grpcServer) String() string {
return "grpc service, addr = " + s.addr
}
// InitServerOptions 初始化rpc设置
func (s *grpcServer) serverOptions() []grpc.ServerOption {
var options []grpc.ServerOption
unaryServerInterceptors := []grpc.UnaryServerInterceptor{
interceptor.UnaryServerRecovery(),
interceptor.UnaryServerCtxTags(),
}
streamServerInterceptors := []grpc.StreamServerInterceptor{}
// logger 拦截器
unaryServerInterceptors = append(unaryServerInterceptors, interceptor.UnaryServerLog(
logger.Get(),
))
// metrics 拦截器
if config.Get().App.EnableMetrics {
unaryServerInterceptors = append(unaryServerInterceptors, interceptor.UnaryServerMetrics())
s.goRunPromHTTPServer = func() error {
if s == nil || s.server == nil {
return errors.New("grpc server is nil")
}
promAddr := fmt.Sprintf(":%d", config.Get().Metrics.Port)
s.metricsHTTPServer = metrics.GoHTTPService(promAddr, s.server)
logger.Infof("start up prometheus http service, addr = %s", promAddr)
return nil
}
}
// limit 拦截器
if config.Get().App.EnableLimit {
unaryServerInterceptors = append(unaryServerInterceptors, interceptor.UnaryServerRateLimit(
interceptor.WithRateLimitQPS(config.Get().RateLimiter.QPSLimit),
))
}
// trace 拦截器
if config.Get().App.EnableTracing {
unaryServerInterceptors = append(unaryServerInterceptors, interceptor.UnaryServerTracing())
}
unaryServer := grpc_middleware.WithUnaryServerChain(unaryServerInterceptors...)
streamServer := grpc_middleware.WithStreamServerChain(streamServerInterceptors...)
options = append(options, unaryServer, streamServer)
return options
}
// NewGRPCServer 创建一个grpc服务
func NewGRPCServer(addr string, opts ...GRPCOption) app.IServer {
var err error
o := defaultGRPCOptions()
o.apply(opts...)
s := &grpcServer{
addr: addr,
iRegistry: o.iRegistry,
serviceInstance: o.instance,
}
// 监听TCP端口
s.listen, err = net.Listen("tcp", addr)
if err != nil {
panic(err)
}
// 创建grpc server对象拦截器可以在这里注入
s.server = grpc.NewServer(s.serverOptions()...)
// 注册所有服务
service.RegisterAllService(s.server)
return s
}