package server import ( "context" "encoding/json" "fmt" "net" "os" "os/signal" "sync" "syscall" "time" "github.com/emsoft/HospitalPay-Go/internal/config" "github.com/emsoft/HospitalPay-Go/internal/model" "github.com/emsoft/HospitalPay-Go/internal/pkg/errcode" "github.com/emsoft/HospitalPay-Go/internal/pkg/logger" "github.com/emsoft/HospitalPay-Go/internal/pkg/metrics" "github.com/emsoft/HospitalPay-Go/internal/service" ) // SocketServer Socket 服务器 type SocketServer struct { criminalService service.ICriminalService listener net.Listener connLimit chan struct{} wg sync.WaitGroup shutdown chan struct{} } // NewSocketServer 创建 Socket 服务器 func NewSocketServer() *SocketServer { return &SocketServer{ criminalService: service.NewCriminalService(), connLimit: make(chan struct{}, config.GlobalConfig.Server.MaxConnNum), shutdown: make(chan struct{}), } } // Start 启动服务器 func (s *SocketServer) Start() error { var err error s.listener, err = net.Listen("tcp", ":"+config.GlobalConfig.Server.Port) if err != nil { return fmt.Errorf("failed to start socket server: %v", err) } // 注册信号处理 go s.handleSignal() logger.Infof("Socket server started on port %s", config.GlobalConfig.Server.Port) // 开始接受连接 go s.acceptConnections() // 等待关闭 <-s.shutdown return nil } // 接受连接 func (s *SocketServer) acceptConnections() { for { conn, err := s.listener.Accept() if err != nil { select { case <-s.shutdown: return default: logger.Errorf("Failed to accept connection: %v", err) continue } } // 记录指标 metrics.ActiveConnections.Inc() // 并发控制 select { case s.connLimit <- struct{}{}: s.wg.Add(1) go func() { defer func() { <-s.connLimit metrics.ActiveConnections.Dec() s.wg.Done() }() s.handleConnection(conn) }() default: logger.Warnf("Connection limit reached, rejecting connection from %s", conn.RemoteAddr()) conn.Close() } } } // 处理连接 func (s *SocketServer) handleConnection(conn net.Conn) { defer conn.Close() // 设置超时 if err := conn.SetDeadline(time.Now().Add(config.GlobalConfig.Server.ReadTimeout)); err != nil { logger.Errorf("Failed to set deadline: %v", err) return } // 记录连接信息 remoteAddr := conn.RemoteAddr().String() logger.Infof("New connection from %s", remoteAddr) buffer := make([]byte, 4096) n, err := conn.Read(buffer) if err != nil { logger.Errorf("Failed to read from connection: %v", err) return } message := string(buffer[:n]) if len(message) < 31 { // 最小消息长度:4(长度) + 4(功能码) + 4(医院编码) + 19(时间戳) logger.Errorf("Invalid message format from %s: %s", remoteAddr, message) return } // 解析消息 length := message[:4] functionCode := message[4:8] hospitalCode := message[8:12] timestamp := message[12:31] data := message[31:] logger.Infof("Received message from %s: length=%s, functionCode=%s, hospitalCode=%s, timestamp=%s, data=%s", remoteAddr, length, functionCode, hospitalCode, timestamp, data) // 请求计数 metrics.RequestCounter.WithLabelValues(functionCode).Inc() start := time.Now() // 验证医院编码 if hospitalCode != config.GlobalConfig.Server.HospitalCode { logger.Warnf("Invalid hospital code from %s: %s", remoteAddr, hospitalCode) s.sendResponse(conn, &model.CriminalResponse{ ResultCode: errcode.InvalidParams.Code, ResultMsg: "无效的医院编码", }) return } // 创建上下文 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // 处理请求 var response *model.CriminalResponse switch functionCode { case "0001": response = s.handleCriminalIn(ctx, data) case "0002": response = s.handleConsumeQuota(ctx, data) case "0003": response = s.handleCriminalOut(ctx, data) case "0004": response = s.handleConsumeRecord(ctx, data) case "0005": response = s.handleRealTimeBalance(ctx, data) case "0006": response = s.handleInvoiceSync(ctx, data) default: response = &model.CriminalResponse{ ResultCode: errcode.ServerError.Code, ResultMsg: "未知的功能码", } } // 记录处理时间 duration := time.Since(start).Seconds() metrics.RequestDuration.WithLabelValues(functionCode).Observe(duration) // 发送响应 s.sendResponse(conn, response) } // 处理入院登记 func (s *SocketServer) handleCriminalIn(ctx context.Context, data string) *model.CriminalResponse { var req model.CriminalRequest if err := json.Unmarshal([]byte(data), &req); err != nil { logger.Errorf("Failed to unmarshal CriminalIn request: %v", err) metrics.ErrorCounter.WithLabelValues("0001", errcode.InvalidParams.Code).Inc() return &model.CriminalResponse{ ResultCode: errcode.InvalidParams.Code, ResultMsg: "请求数据格式错误", } } resp, err := s.criminalService.CriminalIn(ctx, req.FCode) if err != nil { logger.Errorf("Failed to process CriminalIn: %v", err) metrics.ErrorCounter.WithLabelValues("0001", errcode.ServerError.Code).Inc() return &model.CriminalResponse{ ResultCode: errcode.ServerError.Code, ResultMsg: err.Error(), } } return resp } // 处理消费额度查询 func (s *SocketServer) handleConsumeQuota(ctx context.Context, data string) *model.CriminalResponse { var req model.CriminalRequest if err := json.Unmarshal([]byte(data), &req); err != nil { logger.Errorf("Failed to unmarshal ConsumeQuota request: %v", err) metrics.ErrorCounter.WithLabelValues("0002", errcode.InvalidParams.Code).Inc() return &model.CriminalResponse{ ResultCode: errcode.InvalidParams.Code, ResultMsg: "请求数据格式错误", } } resp, err := s.criminalService.ConsumeQuota(ctx, req.FCode) if err != nil { logger.Errorf("Failed to process ConsumeQuota: %v", err) metrics.ErrorCounter.WithLabelValues("0002", errcode.ServerError.Code).Inc() return &model.CriminalResponse{ ResultCode: errcode.ServerError.Code, ResultMsg: err.Error(), } } return resp } // 处理出院 func (s *SocketServer) handleCriminalOut(ctx context.Context, data string) *model.CriminalResponse { var req model.CriminalRequest if err := json.Unmarshal([]byte(data), &req); err != nil { logger.Errorf("Failed to unmarshal CriminalOut request: %v", err) metrics.ErrorCounter.WithLabelValues("0003", errcode.InvalidParams.Code).Inc() return &model.CriminalResponse{ ResultCode: errcode.InvalidParams.Code, ResultMsg: "请求数据格式错误", } } resp, err := s.criminalService.CriminalOut(ctx, req.FCode) if err != nil { logger.Errorf("Failed to process CriminalOut: %v", err) metrics.ErrorCounter.WithLabelValues("0003", errcode.ServerError.Code).Inc() return &model.CriminalResponse{ ResultCode: errcode.ServerError.Code, ResultMsg: err.Error(), } } return resp } // 处理消费记录 func (s *SocketServer) handleConsumeRecord(ctx context.Context, data string) *model.CriminalResponse { var record model.ConsumeRecord if err := json.Unmarshal([]byte(data), &record); err != nil { logger.Errorf("Failed to unmarshal ConsumeRecord request: %v", err) metrics.ErrorCounter.WithLabelValues("0004", errcode.InvalidParams.Code).Inc() return &model.CriminalResponse{ ResultCode: errcode.InvalidParams.Code, ResultMsg: "请求数据格式错误", } } resp, err := s.criminalService.ConsumeRecord(ctx, &record) if err != nil { logger.Errorf("Failed to process ConsumeRecord: %v", err) metrics.ErrorCounter.WithLabelValues("0004", errcode.ServerError.Code).Inc() return &model.CriminalResponse{ ResultCode: errcode.ServerError.Code, ResultMsg: err.Error(), } } return resp } // 处理实时余额查询 func (s *SocketServer) handleRealTimeBalance(ctx context.Context, data string) *model.CriminalResponse { var req model.CriminalRequest if err := json.Unmarshal([]byte(data), &req); err != nil { logger.Errorf("Failed to unmarshal RealTimeBalance request: %v", err) metrics.ErrorCounter.WithLabelValues("0005", errcode.InvalidParams.Code).Inc() return &model.CriminalResponse{ ResultCode: errcode.InvalidParams.Code, ResultMsg: "请求数据格式错误", } } resp, err := s.criminalService.RealTimeBalance(ctx, req.FCode) if err != nil { logger.Errorf("Failed to process RealTimeBalance: %v", err) metrics.ErrorCounter.WithLabelValues("0005", errcode.ServerError.Code).Inc() return &model.CriminalResponse{ ResultCode: errcode.ServerError.Code, ResultMsg: err.Error(), } } return resp } // 处理发票同步 func (s *SocketServer) handleInvoiceSync(ctx context.Context, data string) *model.CriminalResponse { var req model.InvoiceSync if err := json.Unmarshal([]byte(data), &req); err != nil { logger.Errorf("Failed to unmarshal InvoiceSync request: %v", err) metrics.ErrorCounter.WithLabelValues("0006", errcode.InvalidParams.Code).Inc() return &model.CriminalResponse{ ResultCode: errcode.InvalidParams.Code, ResultMsg: "请求数据格式错误", } } resp, err := s.criminalService.InvoiceSync(ctx, req.InvoiceList) if err != nil { logger.Errorf("Failed to process InvoiceSync: %v", err) metrics.ErrorCounter.WithLabelValues("0006", errcode.ServerError.Code).Inc() return &model.CriminalResponse{ ResultCode: errcode.ServerError.Code, ResultMsg: err.Error(), } } return resp } // 发送响应 func (s *SocketServer) sendResponse(conn net.Conn, response *model.CriminalResponse) { jsonData, err := json.Marshal(response) if err != nil { logger.Errorf("Failed to marshal response: %v", err) return } // 设置写超时 if err := conn.SetWriteDeadline(time.Now().Add(config.GlobalConfig.Server.WriteTimeout)); err != nil { logger.Errorf("Failed to set write deadline: %v", err) return } // 构造响应消息:长度(4位) + JSON数据 message := fmt.Sprintf("%04d%s", len(jsonData), string(jsonData)) if _, err := conn.Write([]byte(message)); err != nil { logger.Errorf("Failed to send response: %v", err) return } logger.Infof("Response sent to %s: %s", conn.RemoteAddr().String(), message) } // 优雅关闭 func (s *SocketServer) Stop() { logger.Info("Stopping socket server...") // 关闭监听器,停止接受新连接 if s.listener != nil { s.listener.Close() } // 等待所有连接处理完成 logger.Info("Waiting for all connections to finish...") done := make(chan struct{}) go func() { s.wg.Wait() close(done) }() // 设置超时 select { case <-done: logger.Info("All connections finished") case <-time.After(10 * time.Second): logger.Warn("Timeout waiting for connections to finish") } // 通知关闭完成 close(s.shutdown) } // 处理信号 func (s *SocketServer) handleSignal() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) // 等待信号 sig := <-c logger.Infof("Received signal: %v", sig) // 停止服务器 s.Stop() }