From 7e021370666c511b614a2148be17849b5058a818 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=9E=E6=A5=BD=E5=9D=82=E3=83=8B=E3=83=A3=E3=83=B3?= Date: Tue, 9 Jul 2024 13:00:08 +0000 Subject: [PATCH 1/9] refactor(server): reorganize the project structure --- server/internal/code.go | 29 -- server/internal/common/code.go | 29 ++ server/internal/common/request.go | 25 ++ server/internal/http_server.go | 332 +---------------------- server/internal/router/router.go | 16 ++ server/internal/service/service.go | 347 ++++++++++++++++++++++++ server/internal/{ => service}/worker.go | 35 +-- server/main.go | 9 +- 8 files changed, 439 insertions(+), 383 deletions(-) delete mode 100644 server/internal/code.go create mode 100644 server/internal/common/code.go create mode 100644 server/internal/common/request.go create mode 100644 server/internal/router/router.go create mode 100644 server/internal/service/service.go rename server/internal/{ => service}/worker.go (71%) diff --git a/server/internal/code.go b/server/internal/code.go deleted file mode 100644 index 102d49a9e5..0000000000 --- a/server/internal/code.go +++ /dev/null @@ -1,29 +0,0 @@ -package internal - -type Code struct { - code string - msg string -} - -var ( - codeOk = NewCode("0", "ok") - codeSuccess = NewCode("0", "success") - - codeErrParamsInvalid = NewCode("10000", "params invalid") - codeErrWorkersLimit = NewCode("10001", "workers limit") - codeErrChannelNotExisted = NewCode("10002", "channel not existed") - codeErrChannelExisted = NewCode("10003", "channel existed") - codeErrChannelEmpty = NewCode("10004", "channel empty") - codeErrGenerateTokenFailed = NewCode("10005", "generate token failed") - - codeErrProcessManifestFailed = NewCode("10100", "process manifest json failed") - codeErrStartWorkerFailed = NewCode("10101", "start worker failed") - codeErrStopAppFailed = NewCode("10102", "stop worker failed") -) - -func NewCode(code string, msg string) *Code { - return &Code{ - code: code, - msg: msg, - } -} diff --git a/server/internal/common/code.go b/server/internal/common/code.go new file mode 100644 index 0000000000..a77cb6f540 --- /dev/null +++ b/server/internal/common/code.go @@ -0,0 +1,29 @@ +package common + +type Code struct { + Code string + Msg string +} + +var ( + CodeOk = NewCode("0", "ok") + CodeSuccess = NewCode("0", "success") + + CodeErrParamsInvalid = NewCode("10000", "params invalid") + CodeErrWorkersLimit = NewCode("10001", "workers limit") + CodeErrChannelNotExisted = NewCode("10002", "channel not existed") + CodeErrChannelExisted = NewCode("10003", "channel existed") + CodeErrChannelEmpty = NewCode("10004", "channel empty") + CodeErrGenerateTokenFailed = NewCode("10005", "generate token failed") + + CodeErrProcessManifestFailed = NewCode("10100", "process manifest json failed") + CodeErrStartWorkerFailed = NewCode("10101", "start worker failed") + CodeErrStopAppFailed = NewCode("10102", "stop worker failed") +) + +func NewCode(code string, msg string) *Code { + return &Code{ + Code: code, + Msg: msg, + } +} diff --git a/server/internal/common/request.go b/server/internal/common/request.go new file mode 100644 index 0000000000..0d48eb320b --- /dev/null +++ b/server/internal/common/request.go @@ -0,0 +1,25 @@ +package common + +type PingReq struct { + RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` + ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` +} + +type StartReq struct { + RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` + AgoraAsrLanguage string `form:"agora_asr_language,omitempty" json:"agora_asr_language,omitempty"` + ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` + RemoteStreamId uint32 `form:"remote_stream_id,omitempty" json:"remote_stream_id,omitempty"` + VoiceType string `form:"voice_type,omitempty" json:"voice_type,omitempty"` +} + +type StopReq struct { + RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` + ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` +} + +type GenerateTokenReq struct { + RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` + ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` + Uid uint32 `form:"uid,omitempty" json:"uid,omitempty"` +} diff --git a/server/internal/http_server.go b/server/internal/http_server.go index cf49a94f0f..ca86295bf0 100644 --- a/server/internal/http_server.go +++ b/server/internal/http_server.go @@ -11,19 +11,11 @@ package internal import ( - "fmt" + "app/internal/router" + "app/internal/service" "log/slog" - "net/http" - "os" - "strings" - "time" - rtctokenbuilder "github.com/AgoraIO/Tools/DynamicKey/AgoraDynamicKey/go/src/rtctokenbuilder2" "github.com/gin-gonic/gin" - "github.com/gin-gonic/gin/binding" - "github.com/gogf/gf/crypto/gmd5" - "github.com/tidwall/gjson" - "github.com/tidwall/sjson" ) type HttpServer struct { @@ -41,71 +33,7 @@ type HttpServerConfig struct { WorkerQuitTimeoutSeconds int } -type PingReq struct { - RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` - ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` -} - -type StartReq struct { - RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` - AgoraAsrLanguage string `form:"agora_asr_language,omitempty" json:"agora_asr_language,omitempty"` - ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` - RemoteStreamId uint32 `form:"remote_stream_id,omitempty" json:"remote_stream_id,omitempty"` - VoiceType string `form:"voice_type,omitempty" json:"voice_type,omitempty"` -} - -type StopReq struct { - RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` - ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` -} - -type GenerateTokenReq struct { - RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` - ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` - Uid uint32 `form:"uid,omitempty" json:"uid,omitempty"` -} - -const ( - privilegeExpirationInSeconds = uint32(86400) - tokenExpirationInSeconds = uint32(86400) - - languageChinese = "zh-CN" - languageEnglish = "en-US" - - ManifestJsonFile = "./agents/manifest.json" - ManifestJsonFileElevenlabs = "./agents/manifest.elevenlabs.json" - - TTSVendorAzure = "azure" - TTSVendorElevenlabs = "elevenlabs" - - voiceTypeMale = "male" - voiceTypeFemale = "female" -) - var ( - voiceNameMap = map[string]map[string]map[string]string{ - languageChinese: { - TTSVendorAzure: { - voiceTypeMale: "zh-CN-YunxiNeural", - voiceTypeFemale: "zh-CN-XiaoxiaoNeural", - }, - TTSVendorElevenlabs: { - voiceTypeMale: "pNInz6obpgDQGcFmaJgB", // Adam - voiceTypeFemale: "Xb7hH8MSUJpSbSDYk0k2", // Alice - }, - }, - languageEnglish: { - TTSVendorAzure: { - voiceTypeMale: "en-US-BrianNeural", - voiceTypeFemale: "en-US-JaneNeural", - }, - TTSVendorElevenlabs: { - voiceTypeMale: "pNInz6obpgDQGcFmaJgB", // Adam - voiceTypeFemale: "Xb7hH8MSUJpSbSDYk0k2", // Alice - }, - }, - } - logTag = slog.String("service", "HTTP_SERVER") ) @@ -115,256 +43,24 @@ func NewHttpServer(httpServerConfig *HttpServerConfig) *HttpServer { } } -func (s *HttpServer) getManifestJsonFile(language string) (manifestJsonFile string) { - ttsVendor := s.getTtsVendor(language) - manifestJsonFile = ManifestJsonFile - - if ttsVendor == TTSVendorElevenlabs { - manifestJsonFile = ManifestJsonFileElevenlabs - } - - return -} - -func (s *HttpServer) getTtsVendor(language string) string { - if language == languageChinese { - return s.config.TTSVendorChinese - } - - return s.config.TTSVendorEnglish -} - -func (s *HttpServer) handlerHealth(c *gin.Context) { - slog.Debug("handlerHealth", logTag) - s.output(c, codeOk, nil) -} - -func (s *HttpServer) handlerPing(c *gin.Context) { - var req PingReq - - if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { - slog.Error("handlerPing params invalid", "err", err, logTag) - s.output(c, codeErrParamsInvalid, http.StatusBadRequest) - return - } - - slog.Info("handlerPing start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) - - if strings.TrimSpace(req.ChannelName) == "" { - slog.Error("handlerPing channel empty", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) - s.output(c, codeErrChannelEmpty, http.StatusBadRequest) - return - } - - if !workers.Contains(req.ChannelName) { - slog.Error("handlerPing channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) - s.output(c, codeErrChannelNotExisted, http.StatusBadRequest) - return - } - - // Update worker - worker := workers.Get(req.ChannelName).(*Worker) - worker.UpdateTs = time.Now().Unix() - - slog.Info("handlerPing end", "worker", worker, "requestId", req.RequestId, logTag) - s.output(c, codeSuccess, nil) -} - -func (s *HttpServer) handlerStart(c *gin.Context) { - workersRunning := workers.Size() - - slog.Info("handlerStart start", "workersRunning", workersRunning, logTag) - - var req StartReq - if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { - slog.Error("handlerStart params invalid", "err", err, "requestId", req.RequestId, logTag) - s.output(c, codeErrParamsInvalid, http.StatusBadRequest) - return - } - - if strings.TrimSpace(req.ChannelName) == "" { - slog.Error("handlerStart channel empty", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) - s.output(c, codeErrChannelEmpty, http.StatusBadRequest) - return - } - - if workersRunning >= s.config.WorkersMax { - slog.Error("handlerStart workers exceed", "workersRunning", workersRunning, "WorkersMax", s.config.WorkersMax, "requestId", req.RequestId, logTag) - s.output(c, codeErrWorkersLimit, http.StatusTooManyRequests) - return - } - - if workers.Contains(req.ChannelName) { - slog.Error("handlerStart channel existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) - s.output(c, codeErrChannelExisted, http.StatusBadRequest) - return - } - - manifestJsonFile, logFile, err := s.processManifest(&req) - if err != nil { - slog.Error("handlerStart process manifest", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) - s.output(c, codeErrProcessManifestFailed, http.StatusInternalServerError) - return - } - - worker := newWorker(req.ChannelName, logFile, manifestJsonFile) - worker.QuitTimeoutSeconds = s.config.WorkerQuitTimeoutSeconds - if err := worker.start(&req); err != nil { - slog.Error("handlerStart start worker failed", "err", err, "requestId", req.RequestId, logTag) - s.output(c, codeErrStartWorkerFailed, http.StatusInternalServerError) - return - } - workers.SetIfNotExist(req.ChannelName, worker) - - slog.Info("handlerStart end", "workersRunning", workers.Size(), "worker", worker, "requestId", req.RequestId, logTag) - s.output(c, codeSuccess, nil) -} - -func (s *HttpServer) handlerStop(c *gin.Context) { - var req StopReq - - if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { - slog.Error("handlerStop params invalid", "err", err, logTag) - s.output(c, codeErrParamsInvalid, http.StatusBadRequest) - return - } - - slog.Info("handlerStop start", "req", req, logTag) - - if strings.TrimSpace(req.ChannelName) == "" { - slog.Error("handlerStop channel empty", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) - s.output(c, codeErrChannelEmpty, http.StatusBadRequest) - return - } - - if !workers.Contains(req.ChannelName) { - slog.Error("handlerStop channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) - s.output(c, codeErrChannelNotExisted, http.StatusBadRequest) - return - } - - worker := workers.Get(req.ChannelName).(*Worker) - if err := worker.stop(req.RequestId, req.ChannelName); err != nil { - slog.Error("handlerStop kill app failed", "err", err, "worker", workers.Get(req.ChannelName), "requestId", req.RequestId, logTag) - s.output(c, codeErrStopAppFailed, http.StatusInternalServerError) - return - } - - slog.Info("handlerStop end", "requestId", req.RequestId, logTag) - s.output(c, codeSuccess, nil) -} - -func (s *HttpServer) handlerGenerateToken(c *gin.Context) { - var req GenerateTokenReq - - if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { - slog.Error("handlerGenerateToken params invalid", "err", err, logTag) - s.output(c, codeErrParamsInvalid, http.StatusBadRequest) - return - } - - slog.Info("handlerGenerateToken start", "req", req, logTag) - - if strings.TrimSpace(req.ChannelName) == "" { - slog.Error("handlerGenerateToken channel empty", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) - s.output(c, codeErrChannelEmpty, http.StatusBadRequest) - return - } - - if s.config.AppCertificate == "" { - s.output(c, codeSuccess, map[string]any{"appId": s.config.AppId, "token": s.config.AppId, "channel_name": req.ChannelName, "uid": req.Uid}) - return - } - - token, err := rtctokenbuilder.BuildTokenWithUid(s.config.AppId, s.config.AppCertificate, req.ChannelName, req.Uid, rtctokenbuilder.RolePublisher, tokenExpirationInSeconds, privilegeExpirationInSeconds) - if err != nil { - slog.Error("handlerGenerateToken generate token failed", "err", err, "requestId", req.RequestId, logTag) - s.output(c, codeErrGenerateTokenFailed, http.StatusBadRequest) - return - } - - slog.Info("handlerGenerateToken end", "requestId", req.RequestId, logTag) - s.output(c, codeSuccess, map[string]any{"appId": s.config.AppId, "token": token, "channel_name": req.ChannelName, "uid": req.Uid}) -} - -func (s *HttpServer) output(c *gin.Context, code *Code, data any, httpStatus ...int) { - if len(httpStatus) == 0 { - httpStatus = append(httpStatus, http.StatusOK) - } - - c.JSON(httpStatus[0], gin.H{"code": code.code, "msg": code.msg, "data": data}) -} - -func (s *HttpServer) processManifest(req *StartReq) (manifestJsonFile string, logFile string, err error) { - manifestJsonFile = s.getManifestJsonFile(req.AgoraAsrLanguage) - content, err := os.ReadFile(manifestJsonFile) - if err != nil { - slog.Error("handlerStart read manifest.json failed", "err", err, "manifestJsonFile", manifestJsonFile, "requestId", req.RequestId, logTag) - return - } - - manifestJson := string(content) - - if s.config.AppId != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.app_id`, s.config.AppId) - } - appId := gjson.Get(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.app_id`).String() - - // Generate token - token := appId - if s.config.AppCertificate != "" { - token, err = rtctokenbuilder.BuildTokenWithUid(appId, s.config.AppCertificate, req.ChannelName, 0, rtctokenbuilder.RoleSubscriber, tokenExpirationInSeconds, privilegeExpirationInSeconds) - if err != nil { - slog.Error("handlerStart generate token failed", "err", err, "requestId", req.RequestId, logTag) - return - } - } - - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.token`, token) - if req.AgoraAsrLanguage != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_language`, req.AgoraAsrLanguage) - } - if req.ChannelName != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.channel`, req.ChannelName) - } - if req.RemoteStreamId != 0 { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.remote_stream_id`, req.RemoteStreamId) - } - - language := gjson.Get(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_language`).String() - - ttsVendor := s.getTtsVendor(language) - voiceName := voiceNameMap[language][ttsVendor][req.VoiceType] - if voiceName != "" { - if ttsVendor == TTSVendorAzure { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="azure_tts").property.azure_synthesis_voice_name`, voiceName) - } else if ttsVendor == TTSVendorElevenlabs { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="elevenlabs_tts").property.voice_id`, voiceName) - } - } - - channelNameMd5 := gmd5.MustEncryptString(req.ChannelName) - ts := time.Now().UnixNano() - manifestJsonFile = fmt.Sprintf("/tmp/manifest-%s-%d.json", channelNameMd5, ts) - logFile = fmt.Sprintf("/tmp/app-%s-%d.log", channelNameMd5, ts) - os.WriteFile(manifestJsonFile, []byte(manifestJson), 0644) - - return -} - func (s *HttpServer) Start() { r := gin.Default() r.Use(corsMiddleware()) - r.GET("/", s.handlerHealth) - r.GET("/health", s.handlerHealth) - r.POST("/ping", s.handlerPing) - r.POST("/start", s.handlerStart) - r.POST("/stop", s.handlerStop) - r.POST("/token/generate", s.handlerGenerateToken) + mainSvcConf := service.MainServiceConfig{ + AppId: s.config.AppId, + AppCertificate: s.config.AppCertificate, + ManifestJsonFile: s.config.ManifestJsonFile, + TTSVendorChinese: s.config.TTSVendorChinese, + TTSVendorEnglish: s.config.TTSVendorEnglish, + WorkersMax: s.config.WorkersMax, + WorkerQuitTimeoutSeconds: s.config.WorkerQuitTimeoutSeconds, + } + mainSvc := service.NewMainService(mainSvcConf) + router.Apply(r, mainSvc) slog.Info("server start", "port", s.config.Port, logTag) - go cleanWorker() + go mainSvc.CleanWorker() r.Run(s.config.Port) } diff --git a/server/internal/router/router.go b/server/internal/router/router.go new file mode 100644 index 0000000000..02dc67b5b6 --- /dev/null +++ b/server/internal/router/router.go @@ -0,0 +1,16 @@ +package router + +import ( + "app/internal/service" + + "github.com/gin-gonic/gin" +) + +func Apply(r gin.IRouter, mainSvc *service.MainService) { + r.GET("/", mainSvc.HandlerHealth) + r.GET("/health", mainSvc.HandlerHealth) + r.POST("/ping", mainSvc.HandlerPing) + r.POST("/start", mainSvc.HandlerStart) + r.POST("/stop", mainSvc.HandlerStop) + r.POST("/token/generate", mainSvc.HandlerGenerateToken) +} diff --git a/server/internal/service/service.go b/server/internal/service/service.go new file mode 100644 index 0000000000..22cdc1ce62 --- /dev/null +++ b/server/internal/service/service.go @@ -0,0 +1,347 @@ +package service + +import ( + "app/internal/common" + "fmt" + "log/slog" + "net/http" + "os" + "strings" + "time" + + rtctokenbuilder "github.com/AgoraIO/Tools/DynamicKey/AgoraDynamicKey/go/src/rtctokenbuilder2" + "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" + "github.com/gogf/gf/container/gmap" + "github.com/gogf/gf/crypto/gmd5" + "github.com/google/uuid" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" +) + +const ( + privilegeExpirationInSeconds = uint32(86400) + tokenExpirationInSeconds = uint32(86400) + + languageChinese = "zh-CN" + languageEnglish = "en-US" + + ManifestJsonFile = "./agents/manifest.json" + ManifestJsonFileElevenlabs = "./agents/manifest.elevenlabs.json" + + TTSVendorAzure = "azure" + TTSVendorElevenlabs = "elevenlabs" + + voiceTypeMale = "male" + voiceTypeFemale = "female" +) + +var ( + voiceNameMap = map[string]map[string]map[string]string{ + languageChinese: { + TTSVendorAzure: { + voiceTypeMale: "zh-CN-YunxiNeural", + voiceTypeFemale: "zh-CN-XiaoxiaoNeural", + }, + TTSVendorElevenlabs: { + voiceTypeMale: "pNInz6obpgDQGcFmaJgB", // Adam + voiceTypeFemale: "Xb7hH8MSUJpSbSDYk0k2", // Alice + }, + }, + languageEnglish: { + TTSVendorAzure: { + voiceTypeMale: "en-US-BrianNeural", + voiceTypeFemale: "en-US-JaneNeural", + }, + TTSVendorElevenlabs: { + voiceTypeMale: "pNInz6obpgDQGcFmaJgB", // Adam + voiceTypeFemale: "Xb7hH8MSUJpSbSDYk0k2", // Alice + }, + }, + } + logTag = slog.String("service", "MAIN_SERVICE") +) + +type MainService struct { + config MainServiceConfig + workers *gmap.Map +} + +type MainServiceConfig struct { + AppId string + AppCertificate string + ManifestJsonFile string + TTSVendorChinese string + TTSVendorEnglish string + WorkersMax int + WorkerQuitTimeoutSeconds int +} + +func NewMainService(config MainServiceConfig) *MainService { + return &MainService{ + config: config, + workers: gmap.New(true), + } +} + +func (s *MainService) output(c *gin.Context, code *common.Code, data any, httpStatus ...int) { + if len(httpStatus) == 0 { + httpStatus = append(httpStatus, http.StatusOK) + } + + c.JSON(httpStatus[0], gin.H{"code": code.Code, "msg": code.Msg, "data": data}) +} + +func (s *MainService) HandlerHealth(c *gin.Context) { + slog.Debug("handlerHealth", logTag) + s.output(c, common.CodeOk, nil) +} + +func (s *MainService) HandlerPing(c *gin.Context) { + var req common.PingReq + + if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { + slog.Error("handlerPing params invalid", "err", err, logTag) + s.output(c, common.CodeErrParamsInvalid, http.StatusBadRequest) + return + } + + slog.Info("handlerPing start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + + if strings.TrimSpace(req.ChannelName) == "" { + slog.Error("handlerPing channel empty", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + s.output(c, common.CodeErrChannelEmpty, http.StatusBadRequest) + return + } + + if !s.workers.Contains(req.ChannelName) { + slog.Error("handlerPing channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + s.output(c, common.CodeErrChannelNotExisted, http.StatusBadRequest) + return + } + + // Update worker + worker := s.workers.Get(req.ChannelName).(*Worker) + worker.UpdateTs = time.Now().Unix() + + slog.Info("handlerPing end", "worker", worker, "requestId", req.RequestId, logTag) + s.output(c, common.CodeSuccess, nil) +} + +// HandlerStart is a handle for start worker. +func (s *MainService) HandlerStart(c *gin.Context) { + workersRunning := s.workers.Size() + + slog.Info("handlerStart start", "workersRunning", workersRunning, logTag) + + var req common.StartReq + if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { + slog.Error("handlerStart params invalid", "err", err, "requestId", req.RequestId, logTag) + s.output(c, common.CodeErrParamsInvalid, http.StatusBadRequest) + return + } + + if strings.TrimSpace(req.ChannelName) == "" { + slog.Error("handlerStart channel empty", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + s.output(c, common.CodeErrChannelEmpty, http.StatusBadRequest) + return + } + + if workersRunning >= s.config.WorkersMax { + slog.Error("handlerStart workers exceed", "workersRunning", workersRunning, "WorkersMax", s.config.WorkersMax, "requestId", req.RequestId, logTag) + s.output(c, common.CodeErrWorkersLimit, http.StatusTooManyRequests) + return + } + + if s.workers.Contains(req.ChannelName) { + slog.Error("handlerStart channel existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + s.output(c, common.CodeErrChannelExisted, http.StatusBadRequest) + return + } + + manifestJsonFile, logFile, err := s.processWorkerManifest(&req) + if err != nil { + slog.Error("handlerStart process manifest", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + s.output(c, common.CodeErrProcessManifestFailed, http.StatusInternalServerError) + return + } + + worker := newWorker(req.ChannelName, logFile, manifestJsonFile) + worker.QuitTimeoutSeconds = s.config.WorkerQuitTimeoutSeconds + if err := worker.start(&req); err != nil { + slog.Error("handlerStart start worker failed", "err", err, "requestId", req.RequestId, logTag) + s.output(c, common.CodeErrStartWorkerFailed, http.StatusInternalServerError) + return + } + s.workers.SetIfNotExist(req.ChannelName, worker) + + slog.Info("handlerStart end", "workersRunning", s.workers.Size(), "worker", worker, "requestId", req.RequestId, logTag) + s.output(c, common.CodeSuccess, nil) +} + +func (s *MainService) HandlerStop(c *gin.Context) { + var req common.StopReq + + if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { + slog.Error("handlerStop params invalid", "err", err, logTag) + s.output(c, common.CodeErrParamsInvalid, http.StatusBadRequest) + return + } + + slog.Info("handlerStop start", "req", req, logTag) + + if strings.TrimSpace(req.ChannelName) == "" { + slog.Error("handlerStop channel empty", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + s.output(c, common.CodeErrChannelEmpty, http.StatusBadRequest) + return + } + + if !s.workers.Contains(req.ChannelName) { + slog.Error("handlerStop channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + s.output(c, common.CodeErrChannelNotExisted, http.StatusBadRequest) + return + } + + worker := s.workers.Get(req.ChannelName).(*Worker) + if err := worker.stop(req.RequestId, req.ChannelName); err != nil { + slog.Error("handlerStop kill app failed", "err", err, "worker", s.workers.Get(req.ChannelName), "requestId", req.RequestId, logTag) + s.output(c, common.CodeErrStopAppFailed, http.StatusInternalServerError) + return + } + s.workers.Remove(req.ChannelName) + + slog.Info("handlerStop end", "requestId", req.RequestId, logTag) + s.output(c, common.CodeSuccess, nil) +} + +func (s *MainService) HandlerGenerateToken(c *gin.Context) { + var req common.GenerateTokenReq + + if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { + slog.Error("handlerGenerateToken params invalid", "err", err, logTag) + s.output(c, common.CodeErrParamsInvalid, http.StatusBadRequest) + return + } + + slog.Info("handlerGenerateToken start", "req", req, logTag) + + if strings.TrimSpace(req.ChannelName) == "" { + slog.Error("handlerGenerateToken channel empty", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + s.output(c, common.CodeErrChannelEmpty, http.StatusBadRequest) + return + } + + if s.config.AppCertificate == "" { + s.output(c, common.CodeSuccess, map[string]any{"appId": s.config.AppId, "token": s.config.AppId, "channel_name": req.ChannelName, "uid": req.Uid}) + return + } + + token, err := rtctokenbuilder.BuildTokenWithUid(s.config.AppId, s.config.AppCertificate, req.ChannelName, req.Uid, rtctokenbuilder.RolePublisher, tokenExpirationInSeconds, privilegeExpirationInSeconds) + if err != nil { + slog.Error("handlerGenerateToken generate token failed", "err", err, "requestId", req.RequestId, logTag) + s.output(c, common.CodeErrGenerateTokenFailed, http.StatusBadRequest) + return + } + + slog.Info("handlerGenerateToken end", "requestId", req.RequestId, logTag) + s.output(c, common.CodeSuccess, map[string]any{"appId": s.config.AppId, "token": token, "channel_name": req.ChannelName, "uid": req.Uid}) +} + +// processWorkerManifest create channel temporary Mainfest. +func (s *MainService) processWorkerManifest(req *common.StartReq) (manifestJsonFile string, logFile string, err error) { + manifestJsonFile = s.getManifestJsonFile(req.AgoraAsrLanguage) + content, err := os.ReadFile(s.config.ManifestJsonFile) + if err != nil { + slog.Error("handlerStart read manifest.json failed", "err", err, "manifestJsonFile", s.config.ManifestJsonFile, "requestId", req.RequestId, logTag) + return + } + + manifestJson := string(content) + + if s.config.AppId != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.app_id`, s.config.AppId) + } + appId := gjson.Get(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.app_id`).String() + + // Generate token + token := appId + if s.config.AppCertificate != "" { + token, err = rtctokenbuilder.BuildTokenWithUid(appId, s.config.AppCertificate, req.ChannelName, 0, rtctokenbuilder.RoleSubscriber, tokenExpirationInSeconds, privilegeExpirationInSeconds) + if err != nil { + slog.Error("handlerStart generate token failed", "err", err, "requestId", req.RequestId, logTag) + return + } + } + + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.token`, token) + if req.AgoraAsrLanguage != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_language`, req.AgoraAsrLanguage) + } + if req.ChannelName != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.channel`, req.ChannelName) + } + if req.RemoteStreamId != 0 { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.remote_stream_id`, req.RemoteStreamId) + } + + language := gjson.Get(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_language`).String() + + ttsVendor := s.getTtsVendor(language) + voiceName := voiceNameMap[language][ttsVendor][req.VoiceType] + if voiceName != "" { + if ttsVendor == TTSVendorAzure { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="azure_tts").property.azure_synthesis_voice_name`, voiceName) + } else if ttsVendor == TTSVendorElevenlabs { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="elevenlabs_tts").property.voice_id`, voiceName) + } + } + + channelNameMd5 := gmd5.MustEncryptString(req.ChannelName) + ts := time.Now().UnixNano() + manifestJsonFile = fmt.Sprintf("/tmp/manifest-%s-%d.json", channelNameMd5, ts) + logFile = fmt.Sprintf("/tmp/app-%s-%d.log", channelNameMd5, ts) + os.WriteFile(manifestJsonFile, []byte(manifestJson), 0644) + + return +} + +// CleanWorker clean unused workers in background. +func (s *MainService) CleanWorker() { + for { + for _, channelName := range s.workers.Keys() { + worker := s.workers.Get(channelName).(*Worker) + + nowTs := time.Now().Unix() + if worker.UpdateTs+int64(worker.QuitTimeoutSeconds) < nowTs { + if err := worker.stop(uuid.New().String(), channelName.(string)); err != nil { + slog.Error("Worker cleanWorker failed", "err", err, "channelName", channelName, logTag) + continue + } + + slog.Info("Worker cleanWorker success", "channelName", channelName, "worker", worker, "nowTs", nowTs, logTag) + } + } + + slog.Debug("Worker cleanWorker sleep", "sleep", workerCleanSleepSeconds, logTag) + time.Sleep(workerCleanSleepSeconds * time.Second) + } +} + +func (s *MainService) getManifestJsonFile(language string) (manifestJsonFile string) { + ttsVendor := s.getTtsVendor(language) + manifestJsonFile = ManifestJsonFile + + if ttsVendor == TTSVendorElevenlabs { + manifestJsonFile = ManifestJsonFileElevenlabs + } + + return +} + +func (s *MainService) getTtsVendor(language string) string { + if language == languageChinese { + return s.config.TTSVendorChinese + } + + return s.config.TTSVendorEnglish +} diff --git a/server/internal/worker.go b/server/internal/service/worker.go similarity index 71% rename from server/internal/worker.go rename to server/internal/service/worker.go index e3daf80a83..727f2f513f 100644 --- a/server/internal/worker.go +++ b/server/internal/service/worker.go @@ -1,15 +1,13 @@ -package internal +package service import ( + "app/internal/common" "fmt" "log/slog" "os/exec" "strconv" "strings" "time" - - "github.com/gogf/gf/container/gmap" - "github.com/google/uuid" ) type Worker struct { @@ -27,10 +25,6 @@ const ( workerExec = "/app/agents/bin/worker" ) -var ( - workers = gmap.New(true) -) - func newWorker(channelName string, logFile string, manifestJsonFile string) *Worker { return &Worker{ ChannelName: channelName, @@ -42,7 +36,7 @@ func newWorker(channelName string, logFile string, manifestJsonFile string) *Wor } } -func (w *Worker) start(req *StartReq) (err error) { +func (w *Worker) start(req *common.StartReq) (err error) { shell := fmt.Sprintf("cd /app/agents && nohup %s --manifest %s > %s 2>&1 &", workerExec, w.ManifestJsonFile, w.LogFile) slog.Info("Worker start", "requestId", req.RequestId, "shell", shell, logTag) if _, err = exec.Command("sh", "-c", shell).CombinedOutput(); err != nil { @@ -78,29 +72,6 @@ func (w *Worker) stop(requestId string, channelName string) (err error) { return } - workers.Remove(channelName) - slog.Info("Worker stop end", "channelName", channelName, "worker", w, "requestId", requestId, logTag) return } - -func cleanWorker() { - for { - for _, channelName := range workers.Keys() { - worker := workers.Get(channelName).(*Worker) - - nowTs := time.Now().Unix() - if worker.UpdateTs+int64(worker.QuitTimeoutSeconds) < nowTs { - if err := worker.stop(uuid.New().String(), channelName.(string)); err != nil { - slog.Error("Worker cleanWorker failed", "err", err, "channelName", channelName, logTag) - continue - } - - slog.Info("Worker cleanWorker success", "channelName", channelName, "worker", worker, "nowTs", nowTs, logTag) - } - } - - slog.Debug("Worker cleanWorker sleep", "sleep", workerCleanSleepSeconds, logTag) - time.Sleep(workerCleanSleepSeconds * time.Second) - } -} diff --git a/server/main.go b/server/main.go index d190387bc2..4f1f8e09f2 100644 --- a/server/main.go +++ b/server/main.go @@ -9,6 +9,7 @@ import ( "github.com/tidwall/sjson" "app/internal" + "app/internal/service" ) func main() { @@ -16,12 +17,12 @@ func main() { ttsVendorChinese := os.Getenv("TTS_VENDOR_CHINESE") if len(ttsVendorChinese) == 0 { - ttsVendorChinese = internal.TTSVendorAzure + ttsVendorChinese = service.TTSVendorAzure //TODO vendor provider } ttsVendorEnglish := os.Getenv("TTS_VENDOR_ENGLISH") if len(ttsVendorEnglish) == 0 { - ttsVendorEnglish = internal.TTSVendorAzure + ttsVendorEnglish = service.TTSVendorAzure //TODO vendor provider } workersMax, err := strconv.Atoi(os.Getenv("WORKERS_MAX")) @@ -46,8 +47,8 @@ func main() { slog.Info("server config", "ttsVendorChinese", httpServerConfig.TTSVendorChinese, "ttsVendorEnglish", httpServerConfig.TTSVendorEnglish, "workersMax", httpServerConfig.WorkersMax, "workerQuitTimeoutSeconds", httpServerConfig.WorkerQuitTimeoutSeconds) - processManifest(internal.ManifestJsonFile) - processManifest(internal.ManifestJsonFileElevenlabs) + processManifest(service.ManifestJsonFile) + processManifest(service.ManifestJsonFileElevenlabs) httpServer := internal.NewHttpServer(httpServerConfig) httpServer.Start() } From 2a50ea63ccb16212c534cef33333a20ed9c9cf1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=9E=E6=A5=BD=E5=9D=82=E3=83=8B=E3=83=A3=E3=83=B3?= Date: Tue, 9 Jul 2024 13:44:32 +0000 Subject: [PATCH 2/9] feat(server): graceful exit --- server/internal/http_server.go | 21 ++++++++++++++----- server/main.go | 37 ++++++++++++++++++++++++++++++++-- 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/server/internal/http_server.go b/server/internal/http_server.go index ca86295bf0..5cf6255c61 100644 --- a/server/internal/http_server.go +++ b/server/internal/http_server.go @@ -13,20 +13,23 @@ package internal import ( "app/internal/router" "app/internal/service" + "context" "log/slog" + "net/http" "github.com/gin-gonic/gin" ) type HttpServer struct { config *HttpServerConfig + server *http.Server } type HttpServerConfig struct { AppId string AppCertificate string ManifestJsonFile string - Port string + Address string TTSVendorChinese string TTSVendorEnglish string WorkersMax int @@ -43,7 +46,7 @@ func NewHttpServer(httpServerConfig *HttpServerConfig) *HttpServer { } } -func (s *HttpServer) Start() { +func (s *HttpServer) Run() error { r := gin.Default() r.Use(corsMiddleware()) @@ -57,10 +60,18 @@ func (s *HttpServer) Start() { WorkerQuitTimeoutSeconds: s.config.WorkerQuitTimeoutSeconds, } mainSvc := service.NewMainService(mainSvcConf) + go mainSvc.CleanWorker() router.Apply(r, mainSvc) - slog.Info("server start", "port", s.config.Port, logTag) + slog.Info("server start", "address", s.config.Address, logTag) - go mainSvc.CleanWorker() - r.Run(s.config.Port) + s.server = &http.Server{ + Addr: s.config.Address, + Handler: r, + } + return s.server.ListenAndServe() +} + +func (s *HttpServer) Shutdown(ctx context.Context) error { + return s.server.Shutdown(ctx) } diff --git a/server/main.go b/server/main.go index 4f1f8e09f2..374cc66394 100644 --- a/server/main.go +++ b/server/main.go @@ -1,10 +1,16 @@ package main import ( + "context" + "errors" "flag" "log/slog" + "net/http" "os" + "os/signal" "strconv" + "syscall" + "time" "github.com/tidwall/sjson" @@ -37,7 +43,7 @@ func main() { flag.StringVar(&httpServerConfig.AppId, "appId", os.Getenv("AGORA_APP_ID"), "agora appid") flag.StringVar(&httpServerConfig.AppCertificate, "appCertificate", os.Getenv("AGORA_APP_CERTIFICATE"), "agora certificate") - flag.StringVar(&httpServerConfig.Port, "port", ":8080", "http server port") + flag.StringVar(&httpServerConfig.Address, "port", ":8080", "http server listen address") flag.StringVar(&httpServerConfig.TTSVendorChinese, "ttsVendorChinese", ttsVendorChinese, "tts vendor for chinese") flag.StringVar(&httpServerConfig.TTSVendorEnglish, "ttsVendorEnglish", ttsVendorEnglish, "tts vendor for english") flag.IntVar(&httpServerConfig.WorkersMax, "workersMax", workersMax, "workers max") @@ -50,7 +56,34 @@ func main() { processManifest(service.ManifestJsonFile) processManifest(service.ManifestJsonFileElevenlabs) httpServer := internal.NewHttpServer(httpServerConfig) - httpServer.Start() + + errCh := make(chan error, 1) + go func() { + defer close(errCh) + err := httpServer.Run() + if errors.Is(err, http.ErrServerClosed) { + errCh <- nil + } + errCh <- err + }() + + sigCh := make(chan os.Signal, 1) + defer close(sigCh) + signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT) + <-sigCh + + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Second*3) + err = httpServer.Shutdown(ctx) + if err != nil { + slog.Error("httpServer Shutdown error", "err", err) + } + defer cancel() // fix warning lostcancel + + err = <-errCh + if err != nil { + panic(err) + } } func processManifest(manifestJsonFile string) (err error) { From e1495b638cf4fd83cef0097f6224c998390fc146 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=9E=E6=A5=BD=E5=9D=82=E3=83=8B=E3=83=A3=E3=83=B3?= Date: Tue, 9 Jul 2024 14:01:17 +0000 Subject: [PATCH 3/9] chore(server): optimize config --- server/internal/http_server.go | 26 ++++++-------------------- server/main.go | 2 +- 2 files changed, 7 insertions(+), 21 deletions(-) diff --git a/server/internal/http_server.go b/server/internal/http_server.go index 5cf6255c61..a1c9b5c4df 100644 --- a/server/internal/http_server.go +++ b/server/internal/http_server.go @@ -21,26 +21,21 @@ import ( ) type HttpServer struct { - config *HttpServerConfig + config HttpServerConfig server *http.Server } type HttpServerConfig struct { - AppId string - AppCertificate string - ManifestJsonFile string - Address string - TTSVendorChinese string - TTSVendorEnglish string - WorkersMax int - WorkerQuitTimeoutSeconds int + Address string + + service.MainServiceConfig } var ( logTag = slog.String("service", "HTTP_SERVER") ) -func NewHttpServer(httpServerConfig *HttpServerConfig) *HttpServer { +func NewHttpServer(httpServerConfig HttpServerConfig) *HttpServer { return &HttpServer{ config: httpServerConfig, } @@ -50,16 +45,7 @@ func (s *HttpServer) Run() error { r := gin.Default() r.Use(corsMiddleware()) - mainSvcConf := service.MainServiceConfig{ - AppId: s.config.AppId, - AppCertificate: s.config.AppCertificate, - ManifestJsonFile: s.config.ManifestJsonFile, - TTSVendorChinese: s.config.TTSVendorChinese, - TTSVendorEnglish: s.config.TTSVendorEnglish, - WorkersMax: s.config.WorkersMax, - WorkerQuitTimeoutSeconds: s.config.WorkerQuitTimeoutSeconds, - } - mainSvc := service.NewMainService(mainSvcConf) + mainSvc := service.NewMainService(s.config.MainServiceConfig) go mainSvc.CleanWorker() router.Apply(r, mainSvc) diff --git a/server/main.go b/server/main.go index 374cc66394..d55a2a1400 100644 --- a/server/main.go +++ b/server/main.go @@ -19,7 +19,7 @@ import ( ) func main() { - httpServerConfig := &internal.HttpServerConfig{} + httpServerConfig := internal.HttpServerConfig{} ttsVendorChinese := os.Getenv("TTS_VENDOR_CHINESE") if len(ttsVendorChinese) == 0 { From 675f4eadea9b4d3b47f8d3d686bf97f697f9e3f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=9E=E6=A5=BD=E5=9D=82=E3=83=8B=E3=83=A3=E3=83=B3?= Date: Tue, 9 Jul 2024 14:28:00 +0000 Subject: [PATCH 4/9] fix(server): fixed return the shadowing err variable --- server/internal/service/service.go | 21 ++++++++++++--------- server/internal/service/worker.go | 18 +++++++++--------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/server/internal/service/service.go b/server/internal/service/service.go index 22cdc1ce62..2da1cc8600 100644 --- a/server/internal/service/service.go +++ b/server/internal/service/service.go @@ -159,9 +159,9 @@ func (s *MainService) HandlerStart(c *gin.Context) { return } - manifestJsonFile, logFile, err := s.processWorkerManifest(&req) + manifestJsonFile, logFile, err := s.createWorkerManifest(&req) if err != nil { - slog.Error("handlerStart process manifest", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + slog.Error("handlerStart create worker manifest", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) s.output(c, common.CodeErrProcessManifestFailed, http.StatusInternalServerError) return } @@ -247,13 +247,12 @@ func (s *MainService) HandlerGenerateToken(c *gin.Context) { s.output(c, common.CodeSuccess, map[string]any{"appId": s.config.AppId, "token": token, "channel_name": req.ChannelName, "uid": req.Uid}) } -// processWorkerManifest create channel temporary Mainfest. -func (s *MainService) processWorkerManifest(req *common.StartReq) (manifestJsonFile string, logFile string, err error) { - manifestJsonFile = s.getManifestJsonFile(req.AgoraAsrLanguage) +// createWorkerManifest create worker temporary Mainfest. +func (s *MainService) createWorkerManifest(req *common.StartReq) (manifestJsonFile string, logFile string, err error) { content, err := os.ReadFile(s.config.ManifestJsonFile) if err != nil { slog.Error("handlerStart read manifest.json failed", "err", err, "manifestJsonFile", s.config.ManifestJsonFile, "requestId", req.RequestId, logTag) - return + return "", "", err } manifestJson := string(content) @@ -269,7 +268,7 @@ func (s *MainService) processWorkerManifest(req *common.StartReq) (manifestJsonF token, err = rtctokenbuilder.BuildTokenWithUid(appId, s.config.AppCertificate, req.ChannelName, 0, rtctokenbuilder.RoleSubscriber, tokenExpirationInSeconds, privilegeExpirationInSeconds) if err != nil { slog.Error("handlerStart generate token failed", "err", err, "requestId", req.RequestId, logTag) - return + return "", "", err } } @@ -300,9 +299,13 @@ func (s *MainService) processWorkerManifest(req *common.StartReq) (manifestJsonF ts := time.Now().UnixNano() manifestJsonFile = fmt.Sprintf("/tmp/manifest-%s-%d.json", channelNameMd5, ts) logFile = fmt.Sprintf("/tmp/app-%s-%d.log", channelNameMd5, ts) - os.WriteFile(manifestJsonFile, []byte(manifestJson), 0644) + err = os.WriteFile(manifestJsonFile, []byte(manifestJson), 0644) + if err != nil { + slog.Error("handlerStart write manifest.json failed", "err", err, "manifestJsonFile", s.config.ManifestJsonFile, "requestId", req.RequestId, logTag) + return "", "", err + } - return + return manifestJsonFile, logFile, nil } // CleanWorker clean unused workers in background. diff --git a/server/internal/service/worker.go b/server/internal/service/worker.go index 727f2f513f..d24613dfa1 100644 --- a/server/internal/service/worker.go +++ b/server/internal/service/worker.go @@ -36,12 +36,12 @@ func newWorker(channelName string, logFile string, manifestJsonFile string) *Wor } } -func (w *Worker) start(req *common.StartReq) (err error) { +func (w *Worker) start(req *common.StartReq) error { shell := fmt.Sprintf("cd /app/agents && nohup %s --manifest %s > %s 2>&1 &", workerExec, w.ManifestJsonFile, w.LogFile) slog.Info("Worker start", "requestId", req.RequestId, "shell", shell, logTag) - if _, err = exec.Command("sh", "-c", shell).CombinedOutput(); err != nil { + if _, err := exec.Command("sh", "-c", shell).CombinedOutput(); err != nil { slog.Error("Worker start failed", "err", err, "requestId", req.RequestId, logTag) - return + return err } shell = fmt.Sprintf("ps aux | grep %s | grep -v grep | awk '{print $2}'", w.ManifestJsonFile) @@ -49,29 +49,29 @@ func (w *Worker) start(req *common.StartReq) (err error) { output, err := exec.Command("sh", "-c", shell).CombinedOutput() if err != nil { slog.Error("Worker get pid failed", "err", err, "requestId", req.RequestId, logTag) - return + return err } pid, err := strconv.Atoi(strings.TrimSpace(string(output))) if err != nil || pid <= 0 { slog.Error("Worker convert pid failed", "err", err, "pid", pid, "requestId", req.RequestId, logTag) - return + return err } w.Pid = pid - return + return nil } -func (w *Worker) stop(requestId string, channelName string) (err error) { +func (w *Worker) stop(requestId string, channelName string) error { slog.Info("Worker stop start", "channelName", channelName, "requestId", requestId, logTag) shell := fmt.Sprintf("kill -9 %d", w.Pid) output, err := exec.Command("sh", "-c", shell).CombinedOutput() if err != nil { slog.Error("Worker kill failed", "err", err, "output", output, "channelName", channelName, "worker", w, "requestId", requestId, logTag) - return + return err } slog.Info("Worker stop end", "channelName", channelName, "worker", w, "requestId", requestId, logTag) - return + return err } From 96146fb1abefaa3f06fde3ef2153e95ffeb7f14d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=9E=E6=A5=BD=E5=9D=82=E3=83=8B=E3=83=A3=E3=83=B3?= Date: Tue, 9 Jul 2024 14:35:35 +0000 Subject: [PATCH 5/9] feat(server): lreadonly manifest.json --- server/internal/service/service.go | 24 ++++++++---------------- server/main.go | 26 ++++++++++++++++++++------ 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/server/internal/service/service.go b/server/internal/service/service.go index 2da1cc8600..2b1483648c 100644 --- a/server/internal/service/service.go +++ b/server/internal/service/service.go @@ -26,9 +26,6 @@ const ( languageChinese = "zh-CN" languageEnglish = "en-US" - ManifestJsonFile = "./agents/manifest.json" - ManifestJsonFileElevenlabs = "./agents/manifest.elevenlabs.json" - TTSVendorAzure = "azure" TTSVendorElevenlabs = "elevenlabs" @@ -70,7 +67,8 @@ type MainService struct { type MainServiceConfig struct { AppId string AppCertificate string - ManifestJsonFile string + ManifestJson string + ManifestJsonElevenlabs string TTSVendorChinese string TTSVendorEnglish string WorkersMax int @@ -249,13 +247,7 @@ func (s *MainService) HandlerGenerateToken(c *gin.Context) { // createWorkerManifest create worker temporary Mainfest. func (s *MainService) createWorkerManifest(req *common.StartReq) (manifestJsonFile string, logFile string, err error) { - content, err := os.ReadFile(s.config.ManifestJsonFile) - if err != nil { - slog.Error("handlerStart read manifest.json failed", "err", err, "manifestJsonFile", s.config.ManifestJsonFile, "requestId", req.RequestId, logTag) - return "", "", err - } - - manifestJson := string(content) + manifestJson := s.getManifestJson(req.AgoraAsrLanguage) if s.config.AppId != "" { manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.app_id`, s.config.AppId) @@ -301,7 +293,7 @@ func (s *MainService) createWorkerManifest(req *common.StartReq) (manifestJsonFi logFile = fmt.Sprintf("/tmp/app-%s-%d.log", channelNameMd5, ts) err = os.WriteFile(manifestJsonFile, []byte(manifestJson), 0644) if err != nil { - slog.Error("handlerStart write manifest.json failed", "err", err, "manifestJsonFile", s.config.ManifestJsonFile, "requestId", req.RequestId, logTag) + slog.Error("handlerStart write manifest.json failed", "err", err, "manifestJsonFile", manifestJsonFile, "requestId", req.RequestId, logTag) return "", "", err } @@ -330,15 +322,15 @@ func (s *MainService) CleanWorker() { } } -func (s *MainService) getManifestJsonFile(language string) (manifestJsonFile string) { +func (s *MainService) getManifestJson(language string) (manifestJson string) { ttsVendor := s.getTtsVendor(language) - manifestJsonFile = ManifestJsonFile + manifestJson = s.config.ManifestJson if ttsVendor == TTSVendorElevenlabs { - manifestJsonFile = ManifestJsonFileElevenlabs + manifestJson = s.config.ManifestJsonElevenlabs } - return + return manifestJson } func (s *MainService) getTtsVendor(language string) string { diff --git a/server/main.go b/server/main.go index d55a2a1400..bea958832a 100644 --- a/server/main.go +++ b/server/main.go @@ -18,6 +18,11 @@ import ( "app/internal/service" ) +const ( + ManifestJsonFile = "./agents/manifest.json" + ManifestJsonFileElevenlabs = "./agents/manifest.elevenlabs.json" +) + func main() { httpServerConfig := internal.HttpServerConfig{} @@ -41,6 +46,8 @@ func main() { workerQuitTimeoutSeconds = 60 } + var manifestJsonFile string + flag.StringVar(&httpServerConfig.AppId, "appId", os.Getenv("AGORA_APP_ID"), "agora appid") flag.StringVar(&httpServerConfig.AppCertificate, "appCertificate", os.Getenv("AGORA_APP_CERTIFICATE"), "agora certificate") flag.StringVar(&httpServerConfig.Address, "port", ":8080", "http server listen address") @@ -53,8 +60,16 @@ func main() { slog.Info("server config", "ttsVendorChinese", httpServerConfig.TTSVendorChinese, "ttsVendorEnglish", httpServerConfig.TTSVendorEnglish, "workersMax", httpServerConfig.WorkersMax, "workerQuitTimeoutSeconds", httpServerConfig.WorkerQuitTimeoutSeconds) - processManifest(service.ManifestJsonFile) - processManifest(service.ManifestJsonFileElevenlabs) + httpServerConfig.ManifestJson, err = loadManifest(manifestJsonFile) + if err != nil { + panic(err) + } + + httpServerConfig.ManifestJsonElevenlabs, err = loadManifest(ManifestJsonFileElevenlabs) + if err != nil { + panic(err) + } + httpServer := internal.NewHttpServer(httpServerConfig) errCh := make(chan error, 1) @@ -86,11 +101,11 @@ func main() { } } -func processManifest(manifestJsonFile string) (err error) { +func loadManifest(manifestJsonFile string) (string, error) { content, err := os.ReadFile(manifestJsonFile) if err != nil { slog.Error("read manifest.json failed", "err", err, "manifestJsonFile", manifestJsonFile) - return + return "", err } manifestJson := string(content) @@ -145,6 +160,5 @@ func processManifest(manifestJsonFile string) (err error) { manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="elevenlabs_tts").property.api_key`, elevenlabsTtsKey) } - err = os.WriteFile(manifestJsonFile, []byte(manifestJson), 0644) - return + return manifestJson, nil } From 2a6113156eee1513b6098be49c2048c371059889 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=9E=E6=A5=BD=E5=9D=82=E3=83=8B=E3=83=A3=E3=83=B3?= Date: Tue, 9 Jul 2024 16:08:16 +0000 Subject: [PATCH 6/9] fix manifest after rebase --- server/main.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/main.go b/server/main.go index bea958832a..9ff78eba81 100644 --- a/server/main.go +++ b/server/main.go @@ -46,8 +46,6 @@ func main() { workerQuitTimeoutSeconds = 60 } - var manifestJsonFile string - flag.StringVar(&httpServerConfig.AppId, "appId", os.Getenv("AGORA_APP_ID"), "agora appid") flag.StringVar(&httpServerConfig.AppCertificate, "appCertificate", os.Getenv("AGORA_APP_CERTIFICATE"), "agora certificate") flag.StringVar(&httpServerConfig.Address, "port", ":8080", "http server listen address") @@ -60,7 +58,7 @@ func main() { slog.Info("server config", "ttsVendorChinese", httpServerConfig.TTSVendorChinese, "ttsVendorEnglish", httpServerConfig.TTSVendorEnglish, "workersMax", httpServerConfig.WorkersMax, "workerQuitTimeoutSeconds", httpServerConfig.WorkerQuitTimeoutSeconds) - httpServerConfig.ManifestJson, err = loadManifest(manifestJsonFile) + httpServerConfig.ManifestJson, err = loadManifest(ManifestJsonFile) if err != nil { panic(err) } From b0965be54d990239608d597b5b7cbdfed9b625d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=9E=E6=A5=BD=E5=9D=82=E3=83=8B=E3=83=A3=E3=83=B3?= Date: Tue, 9 Jul 2024 19:16:24 +0000 Subject: [PATCH 7/9] feat(server): add ManifestProvider & TtsProvider. enhance #72 --- server/internal/http_server.go | 37 +++---- server/internal/provider/manifest.go | 117 ++++++++++++++++++++ server/internal/service/service.go | 120 ++++++++------------- server/internal/service/worker.go | 2 +- server/main.go | 119 ++++++-------------- server/{internal => pkg}/common/code.go | 0 server/{internal => pkg}/common/request.go | 24 ++++- server/pkg/provider/tts.go | 24 +++++ server/third_party/azure/tts.go | 52 +++++++++ server/third_party/elevenlabs/tts.go | 52 +++++++++ 10 files changed, 362 insertions(+), 185 deletions(-) create mode 100644 server/internal/provider/manifest.go rename server/{internal => pkg}/common/code.go (100%) rename server/{internal => pkg}/common/request.go (50%) create mode 100644 server/pkg/provider/tts.go create mode 100644 server/third_party/azure/tts.go create mode 100644 server/third_party/elevenlabs/tts.go diff --git a/server/internal/http_server.go b/server/internal/http_server.go index a1c9b5c4df..e35fb94a98 100644 --- a/server/internal/http_server.go +++ b/server/internal/http_server.go @@ -21,40 +21,41 @@ import ( ) type HttpServer struct { - config HttpServerConfig + deps HttpServerDepends server *http.Server } +type HttpServerDepends struct { + Config HttpServerConfig + MainSvc *service.MainService +} + type HttpServerConfig struct { Address string - - service.MainServiceConfig } var ( logTag = slog.String("service", "HTTP_SERVER") ) -func NewHttpServer(httpServerConfig HttpServerConfig) *HttpServer { - return &HttpServer{ - config: httpServerConfig, - } -} - -func (s *HttpServer) Run() error { +func NewHttpServer(deps HttpServerDepends) *HttpServer { r := gin.Default() r.Use(corsMiddleware()) - mainSvc := service.NewMainService(s.config.MainServiceConfig) - go mainSvc.CleanWorker() - router.Apply(r, mainSvc) - - slog.Info("server start", "address", s.config.Address, logTag) + router.Apply(r, deps.MainSvc) - s.server = &http.Server{ - Addr: s.config.Address, - Handler: r, + return &HttpServer{ + deps: deps, + server: &http.Server{ + Addr: deps.Config.Address, + Handler: r, + }, } +} + +func (s *HttpServer) Run() error { + slog.Info("server start", "address", s.server.Addr, logTag) + go s.deps.MainSvc.CleanWorker() return s.server.ListenAndServe() } diff --git a/server/internal/provider/manifest.go b/server/internal/provider/manifest.go new file mode 100644 index 0000000000..63a36439eb --- /dev/null +++ b/server/internal/provider/manifest.go @@ -0,0 +1,117 @@ +package provider + +import ( + "log/slog" + "os" + "path/filepath" + "regexp" + + "github.com/tidwall/sjson" +) + +type ManifestProvider struct { + // manifestJsons + // key: fileName `manifest.json` `manifest.elevenlabs.json` + // value: manifestJson + manifestJsons map[string]string +} + +func NewManifestProvider() *ManifestProvider { + return &ManifestProvider{ + manifestJsons: make(map[string]string), + } +} + +func (p *ManifestProvider) LoadManifest(manifestJsonDir string) error { + files, err := os.ReadDir(manifestJsonDir) + if err != nil { + slog.Error("read manifestJsonDir failed", "err", err, "manifestJsonDir", manifestJsonDir) + return err + } + + matcher := regexp.MustCompile(`^manifest(\..+)?\.json$`) + for _, file := range files { + if file.IsDir() { + continue + } + if !matcher.MatchString(file.Name()) { + continue + } + + filePath := filepath.Join(manifestJsonDir, file.Name()) + content, err := os.ReadFile(filePath) + if err != nil { + slog.Error("read manifest.json failed", "err", err, "filePath", filePath) + return err + } + + manifestJson := string(content) + manifestJson = p.injectEnvVar(manifestJson) + + p.manifestJsons[file.Name()] = manifestJson + } + + return nil +} + +func (p *ManifestProvider) injectEnvVar(manifestJson string) string { + appId := os.Getenv("AGORA_APP_ID") + if appId != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.app_id`, appId) + } + + azureSttKey := os.Getenv("AZURE_STT_KEY") + if azureSttKey != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_vendor_key`, azureSttKey) + } + + azureSttRegion := os.Getenv("AZURE_STT_REGION") + if azureSttRegion != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_vendor_region`, azureSttRegion) + } + + openaiBaseUrl := os.Getenv("OPENAI_BASE_URL") + if openaiBaseUrl != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="openai_chatgpt").property.base_url`, openaiBaseUrl) + } + + openaiApiKey := os.Getenv("OPENAI_API_KEY") + if openaiApiKey != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="openai_chatgpt").property.api_key`, openaiApiKey) + } + + openaiModel := os.Getenv("OPENAI_MODEL") + if openaiModel != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="openai_chatgpt").property.model`, openaiModel) + } + + proxyUrl := os.Getenv("PROXY_URL") + if proxyUrl != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="openai_chatgpt").property.proxy_url`, proxyUrl) + } + + azureTtsKey := os.Getenv("AZURE_TTS_KEY") + if azureTtsKey != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="azure_tts").property.azure_subscription_key`, azureTtsKey) + } + + azureTtsRegion := os.Getenv("AZURE_TTS_REGION") + if azureTtsRegion != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="azure_tts").property.azure_subscription_region`, azureTtsRegion) + } + + elevenlabsTtsKey := os.Getenv("ELEVENLABS_TTS_KEY") + if elevenlabsTtsKey != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="elevenlabs_tts").property.api_key`, elevenlabsTtsKey) + } + + return manifestJson +} + +func (p *ManifestProvider) GetManifestJson(vendor string) (string, bool) { + if len(vendor) > 0 { + vendor = "." + vendor + } + manifestJson, ok := p.manifestJsons["manifest"+vendor+".json"] + return manifestJson, ok +} diff --git a/server/internal/service/service.go b/server/internal/service/service.go index 2b1483648c..4b83562068 100644 --- a/server/internal/service/service.go +++ b/server/internal/service/service.go @@ -1,7 +1,10 @@ package service import ( - "app/internal/common" + "app/internal/provider" + "app/pkg/common" + pkgProvider "app/pkg/provider" + "errors" "fmt" "log/slog" "net/http" @@ -22,62 +25,34 @@ import ( const ( privilegeExpirationInSeconds = uint32(86400) tokenExpirationInSeconds = uint32(86400) - - languageChinese = "zh-CN" - languageEnglish = "en-US" - - TTSVendorAzure = "azure" - TTSVendorElevenlabs = "elevenlabs" - - voiceTypeMale = "male" - voiceTypeFemale = "female" ) var ( - voiceNameMap = map[string]map[string]map[string]string{ - languageChinese: { - TTSVendorAzure: { - voiceTypeMale: "zh-CN-YunxiNeural", - voiceTypeFemale: "zh-CN-XiaoxiaoNeural", - }, - TTSVendorElevenlabs: { - voiceTypeMale: "pNInz6obpgDQGcFmaJgB", // Adam - voiceTypeFemale: "Xb7hH8MSUJpSbSDYk0k2", // Alice - }, - }, - languageEnglish: { - TTSVendorAzure: { - voiceTypeMale: "en-US-BrianNeural", - voiceTypeFemale: "en-US-JaneNeural", - }, - TTSVendorElevenlabs: { - voiceTypeMale: "pNInz6obpgDQGcFmaJgB", // Adam - voiceTypeFemale: "Xb7hH8MSUJpSbSDYk0k2", // Alice - }, - }, - } logTag = slog.String("service", "MAIN_SERVICE") ) type MainService struct { - config MainServiceConfig + deps MainServiceDepends workers *gmap.Map } +type MainServiceDepends struct { + Config MainServiceConfig + ManifestProvider *provider.ManifestProvider +} + type MainServiceConfig struct { AppId string AppCertificate string - ManifestJson string - ManifestJsonElevenlabs string TTSVendorChinese string TTSVendorEnglish string WorkersMax int WorkerQuitTimeoutSeconds int } -func NewMainService(config MainServiceConfig) *MainService { +func NewMainService(deps MainServiceDepends) *MainService { return &MainService{ - config: config, + deps: deps, workers: gmap.New(true), } } @@ -145,8 +120,8 @@ func (s *MainService) HandlerStart(c *gin.Context) { return } - if workersRunning >= s.config.WorkersMax { - slog.Error("handlerStart workers exceed", "workersRunning", workersRunning, "WorkersMax", s.config.WorkersMax, "requestId", req.RequestId, logTag) + if workersRunning >= s.deps.Config.WorkersMax { + slog.Error("handlerStart workers exceed", "workersRunning", workersRunning, "WorkersMax", s.deps.Config.WorkersMax, "requestId", req.RequestId, logTag) s.output(c, common.CodeErrWorkersLimit, http.StatusTooManyRequests) return } @@ -165,7 +140,7 @@ func (s *MainService) HandlerStart(c *gin.Context) { } worker := newWorker(req.ChannelName, logFile, manifestJsonFile) - worker.QuitTimeoutSeconds = s.config.WorkerQuitTimeoutSeconds + worker.QuitTimeoutSeconds = s.deps.Config.WorkerQuitTimeoutSeconds if err := worker.start(&req); err != nil { slog.Error("handlerStart start worker failed", "err", err, "requestId", req.RequestId, logTag) s.output(c, common.CodeErrStartWorkerFailed, http.StatusInternalServerError) @@ -229,12 +204,12 @@ func (s *MainService) HandlerGenerateToken(c *gin.Context) { return } - if s.config.AppCertificate == "" { - s.output(c, common.CodeSuccess, map[string]any{"appId": s.config.AppId, "token": s.config.AppId, "channel_name": req.ChannelName, "uid": req.Uid}) + if s.deps.Config.AppCertificate == "" { + s.output(c, common.CodeSuccess, map[string]any{"appId": s.deps.Config.AppId, "token": s.deps.Config.AppId, "channel_name": req.ChannelName, "uid": req.Uid}) return } - token, err := rtctokenbuilder.BuildTokenWithUid(s.config.AppId, s.config.AppCertificate, req.ChannelName, req.Uid, rtctokenbuilder.RolePublisher, tokenExpirationInSeconds, privilegeExpirationInSeconds) + token, err := rtctokenbuilder.BuildTokenWithUid(s.deps.Config.AppId, s.deps.Config.AppCertificate, req.ChannelName, req.Uid, rtctokenbuilder.RolePublisher, tokenExpirationInSeconds, privilegeExpirationInSeconds) if err != nil { slog.Error("handlerGenerateToken generate token failed", "err", err, "requestId", req.RequestId, logTag) s.output(c, common.CodeErrGenerateTokenFailed, http.StatusBadRequest) @@ -242,22 +217,35 @@ func (s *MainService) HandlerGenerateToken(c *gin.Context) { } slog.Info("handlerGenerateToken end", "requestId", req.RequestId, logTag) - s.output(c, common.CodeSuccess, map[string]any{"appId": s.config.AppId, "token": token, "channel_name": req.ChannelName, "uid": req.Uid}) + s.output(c, common.CodeSuccess, map[string]any{"appId": s.deps.Config.AppId, "token": token, "channel_name": req.ChannelName, "uid": req.Uid}) } // createWorkerManifest create worker temporary Mainfest. func (s *MainService) createWorkerManifest(req *common.StartReq) (manifestJsonFile string, logFile string, err error) { - manifestJson := s.getManifestJson(req.AgoraAsrLanguage) + vendor := s.getTtsVendor(req.AgoraAsrLanguage) + tts := pkgProvider.GetTts(vendor) + if tts == nil { + err = errors.New(fmt.Sprintf("unknow tts vendor", vendor)) + slog.Error("handlerStart generate token failed", "err", err, "requestId", req.RequestId, logTag) + return "", "", err + } + + manifestJson, ok := s.deps.ManifestProvider.GetManifestJson(vendor) + if !ok { + err = errors.New(fmt.Sprintf("unknow manifest vendor", vendor)) + slog.Error("handlerStart get manifest json failed", "err", err, "requestId", req.RequestId, logTag) + return "", "", err + } - if s.config.AppId != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.app_id`, s.config.AppId) + if s.deps.Config.AppId != "" { + manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.app_id`, s.deps.Config.AppId) } appId := gjson.Get(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.app_id`).String() // Generate token token := appId - if s.config.AppCertificate != "" { - token, err = rtctokenbuilder.BuildTokenWithUid(appId, s.config.AppCertificate, req.ChannelName, 0, rtctokenbuilder.RoleSubscriber, tokenExpirationInSeconds, privilegeExpirationInSeconds) + if s.deps.Config.AppCertificate != "" { + token, err = rtctokenbuilder.BuildTokenWithUid(appId, s.deps.Config.AppCertificate, req.ChannelName, 0, rtctokenbuilder.RoleSubscriber, tokenExpirationInSeconds, privilegeExpirationInSeconds) if err != nil { slog.Error("handlerStart generate token failed", "err", err, "requestId", req.RequestId, logTag) return "", "", err @@ -275,16 +263,11 @@ func (s *MainService) createWorkerManifest(req *common.StartReq) (manifestJsonFi manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.remote_stream_id`, req.RemoteStreamId) } - language := gjson.Get(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_language`).String() - - ttsVendor := s.getTtsVendor(language) - voiceName := voiceNameMap[language][ttsVendor][req.VoiceType] - if voiceName != "" { - if ttsVendor == TTSVendorAzure { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="azure_tts").property.azure_synthesis_voice_name`, voiceName) - } else if ttsVendor == TTSVendorElevenlabs { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="elevenlabs_tts").property.voice_id`, voiceName) - } + language := gjson.Get(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_language`).String() //TODO check is correct? not req.AgoraAsrLanguage? + manifestJson, err = tts.ProcessManifest(manifestJson, common.Language(language), req.VoiceType) + if err != nil { + slog.Error("handlerStart tts ProcessManifest failed", "err", err, "requestId", req.RequestId, logTag) + return "", "", err } channelNameMd5 := gmd5.MustEncryptString(req.ChannelName) @@ -322,21 +305,10 @@ func (s *MainService) CleanWorker() { } } -func (s *MainService) getManifestJson(language string) (manifestJson string) { - ttsVendor := s.getTtsVendor(language) - manifestJson = s.config.ManifestJson - - if ttsVendor == TTSVendorElevenlabs { - manifestJson = s.config.ManifestJsonElevenlabs - } - - return manifestJson -} - -func (s *MainService) getTtsVendor(language string) string { - if language == languageChinese { - return s.config.TTSVendorChinese +func (s *MainService) getTtsVendor(language common.Language) string { + if language == common.LanguageChinese { + return s.deps.Config.TTSVendorChinese } - return s.config.TTSVendorEnglish + return s.deps.Config.TTSVendorEnglish } diff --git a/server/internal/service/worker.go b/server/internal/service/worker.go index d24613dfa1..3e4f7f1694 100644 --- a/server/internal/service/worker.go +++ b/server/internal/service/worker.go @@ -1,7 +1,7 @@ package service import ( - "app/internal/common" + "app/pkg/common" "fmt" "log/slog" "os/exec" diff --git a/server/main.go b/server/main.go index 9ff78eba81..400ee6b274 100644 --- a/server/main.go +++ b/server/main.go @@ -12,28 +12,21 @@ import ( "syscall" "time" - "github.com/tidwall/sjson" - "app/internal" + "app/internal/provider" "app/internal/service" -) - -const ( - ManifestJsonFile = "./agents/manifest.json" - ManifestJsonFileElevenlabs = "./agents/manifest.elevenlabs.json" + "app/third_party/azure" ) func main() { - httpServerConfig := internal.HttpServerConfig{} - ttsVendorChinese := os.Getenv("TTS_VENDOR_CHINESE") if len(ttsVendorChinese) == 0 { - ttsVendorChinese = service.TTSVendorAzure //TODO vendor provider + ttsVendorChinese = azure.NAME } ttsVendorEnglish := os.Getenv("TTS_VENDOR_ENGLISH") if len(ttsVendorEnglish) == 0 { - ttsVendorEnglish = service.TTSVendorAzure //TODO vendor provider + ttsVendorEnglish = azure.NAME } workersMax, err := strconv.Atoi(os.Getenv("WORKERS_MAX")) @@ -46,29 +39,43 @@ func main() { workerQuitTimeoutSeconds = 60 } - flag.StringVar(&httpServerConfig.AppId, "appId", os.Getenv("AGORA_APP_ID"), "agora appid") - flag.StringVar(&httpServerConfig.AppCertificate, "appCertificate", os.Getenv("AGORA_APP_CERTIFICATE"), "agora certificate") + var manifestJsonDir string + flag.StringVar(&manifestJsonDir, "manifestJsonDir", "./agents/", "manifest json dir") + + httpServerConfig := internal.HttpServerConfig{} flag.StringVar(&httpServerConfig.Address, "port", ":8080", "http server listen address") - flag.StringVar(&httpServerConfig.TTSVendorChinese, "ttsVendorChinese", ttsVendorChinese, "tts vendor for chinese") - flag.StringVar(&httpServerConfig.TTSVendorEnglish, "ttsVendorEnglish", ttsVendorEnglish, "tts vendor for english") - flag.IntVar(&httpServerConfig.WorkersMax, "workersMax", workersMax, "workers max") - flag.IntVar(&httpServerConfig.WorkerQuitTimeoutSeconds, "workerQuitTimeoutSeconds", workerQuitTimeoutSeconds, "worker quit timeout seconds") + + mainServiceConfig := service.MainServiceConfig{} + flag.StringVar(&mainServiceConfig.AppId, "appId", os.Getenv("AGORA_APP_ID"), "agora appid") + flag.StringVar(&mainServiceConfig.AppCertificate, "appCertificate", os.Getenv("AGORA_APP_CERTIFICATE"), "agora certificate") + flag.StringVar(&mainServiceConfig.TTSVendorChinese, "ttsVendorChinese", ttsVendorChinese, "tts vendor for chinese") + flag.StringVar(&mainServiceConfig.TTSVendorEnglish, "ttsVendorEnglish", ttsVendorEnglish, "tts vendor for english") + flag.IntVar(&mainServiceConfig.WorkersMax, "workersMax", workersMax, "workers max") + flag.IntVar(&mainServiceConfig.WorkerQuitTimeoutSeconds, "workerQuitTimeoutSeconds", workerQuitTimeoutSeconds, "worker quit timeout seconds") + flag.Parse() - slog.Info("server config", "ttsVendorChinese", httpServerConfig.TTSVendorChinese, "ttsVendorEnglish", httpServerConfig.TTSVendorEnglish, - "workersMax", httpServerConfig.WorkersMax, "workerQuitTimeoutSeconds", httpServerConfig.WorkerQuitTimeoutSeconds) + slog.Info("server config", + "ttsVendorChinese", mainServiceConfig.TTSVendorChinese, + "ttsVendorEnglish", mainServiceConfig.TTSVendorEnglish, + "workersMax", mainServiceConfig.WorkersMax, + "workerQuitTimeoutSeconds", mainServiceConfig.WorkerQuitTimeoutSeconds) - httpServerConfig.ManifestJson, err = loadManifest(ManifestJsonFile) + manifestProvider := provider.NewManifestProvider() + err = manifestProvider.LoadManifest(manifestJsonDir) if err != nil { panic(err) } - httpServerConfig.ManifestJsonElevenlabs, err = loadManifest(ManifestJsonFileElevenlabs) - if err != nil { - panic(err) - } + mainSvc := service.NewMainService(service.MainServiceDepends{ + Config: mainServiceConfig, + ManifestProvider: manifestProvider, + }) - httpServer := internal.NewHttpServer(httpServerConfig) + httpServer := internal.NewHttpServer(internal.HttpServerDepends{ + Config: httpServerConfig, + MainSvc: mainSvc, + }) errCh := make(chan error, 1) go func() { @@ -98,65 +105,3 @@ func main() { panic(err) } } - -func loadManifest(manifestJsonFile string) (string, error) { - content, err := os.ReadFile(manifestJsonFile) - if err != nil { - slog.Error("read manifest.json failed", "err", err, "manifestJsonFile", manifestJsonFile) - return "", err - } - - manifestJson := string(content) - - appId := os.Getenv("AGORA_APP_ID") - if appId != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.app_id`, appId) - } - - azureSttKey := os.Getenv("AZURE_STT_KEY") - if azureSttKey != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_vendor_key`, azureSttKey) - } - - azureSttRegion := os.Getenv("AZURE_STT_REGION") - if azureSttRegion != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_vendor_region`, azureSttRegion) - } - - openaiBaseUrl := os.Getenv("OPENAI_BASE_URL") - if openaiBaseUrl != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="openai_chatgpt").property.base_url`, openaiBaseUrl) - } - - openaiApiKey := os.Getenv("OPENAI_API_KEY") - if openaiApiKey != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="openai_chatgpt").property.api_key`, openaiApiKey) - } - - openaiModel := os.Getenv("OPENAI_MODEL") - if openaiModel != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="openai_chatgpt").property.model`, openaiModel) - } - - proxyUrl := os.Getenv("PROXY_URL") - if proxyUrl != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="openai_chatgpt").property.proxy_url`, proxyUrl) - } - - azureTtsKey := os.Getenv("AZURE_TTS_KEY") - if azureTtsKey != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="azure_tts").property.azure_subscription_key`, azureTtsKey) - } - - azureTtsRegion := os.Getenv("AZURE_TTS_REGION") - if azureTtsRegion != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="azure_tts").property.azure_subscription_region`, azureTtsRegion) - } - - elevenlabsTtsKey := os.Getenv("ELEVENLABS_TTS_KEY") - if elevenlabsTtsKey != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="elevenlabs_tts").property.api_key`, elevenlabsTtsKey) - } - - return manifestJson, nil -} diff --git a/server/internal/common/code.go b/server/pkg/common/code.go similarity index 100% rename from server/internal/common/code.go rename to server/pkg/common/code.go diff --git a/server/internal/common/request.go b/server/pkg/common/request.go similarity index 50% rename from server/internal/common/request.go rename to server/pkg/common/request.go index 0d48eb320b..d46adad5b8 100644 --- a/server/internal/common/request.go +++ b/server/pkg/common/request.go @@ -1,16 +1,30 @@ package common +type Language string + +const ( + LanguageChinese Language = "zh-CN" + LanguageEnglish Language = "en-US" +) + +type VoiceType string + +const ( + VoiceTypeMale = "male" + VoiceTypeFemale = "female" +) + type PingReq struct { RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` } type StartReq struct { - RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` - AgoraAsrLanguage string `form:"agora_asr_language,omitempty" json:"agora_asr_language,omitempty"` - ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` - RemoteStreamId uint32 `form:"remote_stream_id,omitempty" json:"remote_stream_id,omitempty"` - VoiceType string `form:"voice_type,omitempty" json:"voice_type,omitempty"` + RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` + AgoraAsrLanguage Language `form:"agora_asr_language,omitempty" json:"agora_asr_language,omitempty"` + ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` + RemoteStreamId uint32 `form:"remote_stream_id,omitempty" json:"remote_stream_id,omitempty"` + VoiceType VoiceType `form:"voice_type,omitempty" json:"voice_type,omitempty"` } type StopReq struct { diff --git a/server/pkg/provider/tts.go b/server/pkg/provider/tts.go new file mode 100644 index 0000000000..2e351aced3 --- /dev/null +++ b/server/pkg/provider/tts.go @@ -0,0 +1,24 @@ +package provider + +import "app/pkg/common" + +var registeredTts = make(map[string]ITtsProvider) + +type ITtsProvider interface { + Name() string + ProcessManifest(manifestJson string, language common.Language, voiceType common.VoiceType) (string, error) +} + +func RegisterTts(provider ITtsProvider) { + if provider == nil { + panic("cannot register a nil ITtsProvider") + } + if provider.Name() == "" { + panic("cannot register ITtsProvider with empty string result for Name()") + } + registeredTts[provider.Name()] = provider +} + +func GetTts(name string) ITtsProvider { + return registeredTts[name] +} diff --git a/server/third_party/azure/tts.go b/server/third_party/azure/tts.go new file mode 100644 index 0000000000..8ff4ba79ab --- /dev/null +++ b/server/third_party/azure/tts.go @@ -0,0 +1,52 @@ +package azure + +import ( + "app/pkg/common" + "app/pkg/provider" + "errors" + + "github.com/tidwall/sjson" +) + +const NAME string = "azure" + +func init() { + provider.RegisterTts(NewAzureTtsProvider()) +} + +type AzureTtsProvider struct { + voiceNameMap map[common.Language]map[common.VoiceType]string +} + +func NewAzureTtsProvider() provider.ITtsProvider { + return &AzureTtsProvider{ + voiceNameMap: map[common.Language]map[common.VoiceType]string{ + common.LanguageChinese: { + common.VoiceTypeMale: "zh-CN-YunxiNeural", + common.VoiceTypeFemale: "zh-CN-XiaoxiaoNeural", + }, + common.LanguageEnglish: { + common.VoiceTypeMale: "en-US-BrianNeural", + common.VoiceTypeFemale: "en-US-JaneNeural", + }, + }, + } +} + +// Name implements provider.ITtsProvider. +func (p *AzureTtsProvider) Name() string { + return NAME +} + +// ProcessManifest implements provider.ITtsProvider. +func (p *AzureTtsProvider) ProcessManifest(manifestJson string, language common.Language, voiceType common.VoiceType) (string, error) { + voiceTypeMap, ok := p.voiceNameMap[language] + if !ok { + return "", errors.New("unknow language") + } + voiceName, ok := voiceTypeMap[voiceType] + if !ok { + return "", errors.New("unknow voiceType") + } + return sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="azure_tts").property.azure_synthesis_voice_name`, voiceName) +} diff --git a/server/third_party/elevenlabs/tts.go b/server/third_party/elevenlabs/tts.go new file mode 100644 index 0000000000..b01519cacf --- /dev/null +++ b/server/third_party/elevenlabs/tts.go @@ -0,0 +1,52 @@ +package elevenlabs + +import ( + "app/pkg/common" + "app/pkg/provider" + "errors" + + "github.com/tidwall/sjson" +) + +const NAME string = "elevenlabs" + +func init() { + provider.RegisterTts(NewElevenLabsTtsProvider()) +} + +type ElevenLabsTtsProvider struct { + voiceNameMap map[common.Language]map[common.VoiceType]string +} + +func NewElevenLabsTtsProvider() provider.ITtsProvider { + return &ElevenLabsTtsProvider{ + voiceNameMap: map[common.Language]map[common.VoiceType]string{ + common.LanguageChinese: { + common.VoiceTypeMale: "pNInz6obpgDQGcFmaJgB", // Adam + common.VoiceTypeFemale: "Xb7hH8MSUJpSbSDYk0k2", // Alice + }, + common.LanguageEnglish: { + common.VoiceTypeMale: "pNInz6obpgDQGcFmaJgB", // Adam + common.VoiceTypeFemale: "Xb7hH8MSUJpSbSDYk0k2", // Alice + }, + }, + } +} + +// Name implements provider.ITtsProvider. +func (p *ElevenLabsTtsProvider) Name() string { + return NAME +} + +// ProcessManifest implements provider.ITtsProvider. +func (p *ElevenLabsTtsProvider) ProcessManifest(manifestJson string, language common.Language, voiceType common.VoiceType) (string, error) { + voiceTypeMap, ok := p.voiceNameMap[language] + if !ok { + return "", errors.New("unknow language") + } + voiceName, ok := voiceTypeMap[voiceType] + if !ok { + return "", errors.New("unknow voiceType") + } + return sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="elevenlabs_tts").property.voice_id`, voiceName) +} From 287949c148eb8d0ea42781759eefb9e5b8ef6403 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=9E=E6=A5=BD=E5=9D=82=E3=83=8B=E3=83=A3=E3=83=B3?= Date: Tue, 9 Jul 2024 19:38:03 +0000 Subject: [PATCH 8/9] feat(server): kill -9 is not recommand, syscall.Kill SIGTERM instead of. --- server/internal/service/worker.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/internal/service/worker.go b/server/internal/service/worker.go index 3e4f7f1694..b1546fdba1 100644 --- a/server/internal/service/worker.go +++ b/server/internal/service/worker.go @@ -7,6 +7,7 @@ import ( "os/exec" "strconv" "strings" + "syscall" "time" ) @@ -65,10 +66,9 @@ func (w *Worker) start(req *common.StartReq) error { func (w *Worker) stop(requestId string, channelName string) error { slog.Info("Worker stop start", "channelName", channelName, "requestId", requestId, logTag) - shell := fmt.Sprintf("kill -9 %d", w.Pid) - output, err := exec.Command("sh", "-c", shell).CombinedOutput() + err := syscall.Kill(w.Pid, syscall.SIGTERM) if err != nil { - slog.Error("Worker kill failed", "err", err, "output", output, "channelName", channelName, "worker", w, "requestId", requestId, logTag) + slog.Error("Worker kill failed", "err", err, "channelName", channelName, "worker", w, "requestId", requestId, logTag) return err } From f96d9268788077e1e33ac3f02b3117bac7d12c8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=9E=E6=A5=BD=E5=9D=82=E3=83=8B=E3=83=A3=E3=83=B3?= Date: Wed, 10 Jul 2024 05:32:43 +0000 Subject: [PATCH 9/9] rm todo --- server/internal/service/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/internal/service/service.go b/server/internal/service/service.go index 4b83562068..f48d5bf054 100644 --- a/server/internal/service/service.go +++ b/server/internal/service/service.go @@ -263,7 +263,7 @@ func (s *MainService) createWorkerManifest(req *common.StartReq) (manifestJsonFi manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.remote_stream_id`, req.RemoteStreamId) } - language := gjson.Get(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_language`).String() //TODO check is correct? not req.AgoraAsrLanguage? + language := gjson.Get(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_language`).String() manifestJson, err = tts.ProcessManifest(manifestJson, common.Language(language), req.VoiceType) if err != nil { slog.Error("handlerStart tts ProcessManifest failed", "err", err, "requestId", req.RequestId, logTag)