diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index a3e0182..23e77e0 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -45,14 +45,12 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: '1.23' + go-version: '1.24.3' - name: golangci-lint - uses: golangci/golangci-lint-action@v3 + uses: golangci/golangci-lint-action@v8 with: - version: v1.61 - skip-pkg-cache: true - skip-build-cache: true + version: v2.1.0 - name: Build if: ${{ matrix.app != 'pkg' && matrix.app != 'internal' }} @@ -120,13 +118,13 @@ jobs: working-directory: pkg approve: - needs: integration-tests + needs: [build, integration-tests] runs-on: ubuntu-latest if: ${{ always() }} steps: - - name: Approve Build - if: ${{ needs.integration-tests.result == 'success' || needs.integration-tests.result == 'skipped'}} + - name: Approve + if: ${{ needs.build.result == 'success' && needs.integration-tests.result == 'success' }} uses: actions/github-script@v7 with: script: | @@ -138,8 +136,8 @@ jobs: }) process.exit(0) - - name: Fail Build - if: ${{ needs.integration-tests.result == 'failure' }} + - name: Reject + if: ${{ needs.build.result != 'success' || needs.integration-tests.result != 'success' }} uses: actions/github-script@v7 with: script: | diff --git a/.golangci.yml b/.golangci.yml index 8290518..40c7890 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,21 +1,6 @@ -# This configuration file is not a recommendation. -# -# We intentionally use a limited set of linters. -# This configuration file is used with different version of golangci-lint to avoid regressions: -# the linters can change between version, -# their configuration may be not compatible or their reports can be different, -# and this can break some of our tests. -# Also, some linters are not relevant for the project (e.g. linters related to SQL). -# -# We have specific constraints, so we use a specific configuration. -# -# See the file `.golangci.reference.yml` to have a list of all available configuration options. - +version: '2' linters: - disable-all: true - # This list of linters is not a recommendation (same thing for all this configuration file). - # We intentionally use a limited set of linters. - # See the comment on top of this file. + default: none enable: - bodyclose - copyloopvar @@ -26,39 +11,51 @@ linters: - funlen - gocheckcompilerdirectives - gochecknoinits - - gochecknoinits - goconst - gocritic - gocyclo - - godox - godot - - gofmt - - goimports - - mnd + - godox - goprintffuncname - gosec - - gosimple - govet - - intrange - ineffassign + - intrange - lll - misspell + - mnd - nakedret - noctx - nolintlint - revive - staticcheck - - stylecheck - testifylint - unconvert - unparam - unused - whitespace - wrapcheck - -linters-settings: - funlen: - lines: 199 - -run: - timeout: 5m + settings: + funlen: + lines: 199 + exclusions: + generated: lax + presets: + - comments + - common-false-positives + - legacy + - std-error-handling + paths: + - third_party$ + - builtin$ + - examples$ +formatters: + enable: + - gofmt + - goimports + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/cmd/flight-processor/main.go b/cmd/flight-processor/main.go index d7d3766..5381003 100644 --- a/cmd/flight-processor/main.go +++ b/cmd/flight-processor/main.go @@ -131,15 +131,12 @@ func startBackgroundJobs(ctx context.Context, processor *service.Processor) erro func safeShutDown(ctx context.Context, processor *service.Processor, mongodb *mongo.Client) error { slog.Info("Shutting down components") - if err := processor.MessageReader.Close(); err != nil { - slog.Error("Failed to shutdown message reader", "error", err) - return fmt.Errorf("failed to shutdown message reader: %w", err) - } - if err := mongodb.Client.Disconnect(ctx); err != nil { slog.Error("Failed to shutdown MongoDB client", "error", err) return fmt.Errorf("failed to shutdown mongodb client: %w", err) } + processor.MessageReader.Close() + return nil } diff --git a/cmd/flight-reader/main.go b/cmd/flight-reader/main.go index 26ab552..c059186 100644 --- a/cmd/flight-reader/main.go +++ b/cmd/flight-reader/main.go @@ -155,14 +155,8 @@ func safeShutDown( return fmt.Errorf("failed to shutdown HTTP server: %w", err) } - // Close the HTTP client httpClient.CloseIdleConnections() + reader.Close() - if err := reader.Close(); err != nil { - slog.Error("Failed to shutdown reader", "error", err) - return fmt.Errorf("failed to shutdown reader: %w", err) - } - - // Return nil if all shutdowns were successful return nil } diff --git a/go.mod b/go.mod index a8e8fc5..c622939 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,10 @@ module github.com/ansoncht/flight-microservices -go 1.23.3 +go 1.24.3 require ( - golang.org/x/sync v0.13.0 - google.golang.org/protobuf v1.34.2 + golang.org/x/sync v0.14.0 + google.golang.org/protobuf v1.36.5 ) require ( @@ -29,8 +29,9 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect github.com/hashicorp/hcl v1.0.0 // indirect - github.com/klauspost/compress v1.17.4 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.10 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect @@ -45,7 +46,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect - github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect @@ -58,9 +59,9 @@ require ( github.com/spf13/cast v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.6.0 // indirect - github.com/testcontainers/testcontainers-go/modules/kafka v0.37.0 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.11.2 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect @@ -73,25 +74,27 @@ require ( go.opentelemetry.io/otel/trace v1.35.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/crypto v0.37.0 // indirect + golang.org/x/crypto v0.38.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/mod v0.18.0 // indirect golang.org/x/net v0.39.0 // indirect - golang.org/x/sys v0.32.0 // indirect - golang.org/x/text v0.24.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.25.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) require ( github.com/michimani/gotwi v0.16.1 - github.com/segmentio/kafka-go v0.4.47 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go v0.37.0 + github.com/testcontainers/testcontainers-go/modules/kafka v0.37.0 github.com/testcontainers/testcontainers-go/modules/mongodb v0.37.0 + github.com/twmb/franz-go v1.19.1 + github.com/twmb/franz-go/pkg/kadm v1.16.0 go.mongodb.org/mongo-driver v1.17.1 go.uber.org/mock v0.5.2 - google.golang.org/grpc v1.68.0 + google.golang.org/grpc v1.70.0 ) diff --git a/go.sum b/go.sum index 7c4a518..9ad98da 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,11 @@ dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= +github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -12,6 +16,8 @@ github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpS github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw= github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA= github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= +github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= +github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -24,6 +30,12 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= +github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/ebitengine/purego v0.8.2 h1:jPPGWs2sZ1UgOSgD2bClL0MJIqu58nOmIcBuXr62z1I= github.com/ebitengine/purego v0.8.2/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= @@ -46,27 +58,40 @@ github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6 github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= -github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= -github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= -github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/magiconair/properties v1.8.10 h1:s31yESBquKXCV9a/ScB3ESkOjUYYv+X0rg8SYxI99mE= github.com/magiconair/properties v1.8.10/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/michimani/gotwi v0.16.1 h1:4VlNVDs6MB9Yonj4wSIrtxhL0kMLczG2+Zv+2wFn6N0= @@ -95,10 +120,8 @@ github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJw github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= -github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= -github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= -github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -106,14 +129,14 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= -github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= -github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/shirou/gopsutil/v4 v4.25.1 h1:QSWkTc+fu9LTAWfkZwZ6j8MSUk4A2LV7rbH0ZqmLjXs= github.com/shirou/gopsutil/v4 v4.25.1/go.mod h1:RoUCUpndaJFtT+2zsZzzmhvbfGoDCJ7nFXKJf8GqJbI= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= @@ -131,6 +154,7 @@ github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -152,6 +176,12 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/twmb/franz-go v1.19.1 h1:cOhDFUkGvUFHSQ7UYW6bO77BJa2fYEk5mA2AX+1NIdE= +github.com/twmb/franz-go v1.19.1/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= +github.com/twmb/franz-go/pkg/kadm v1.16.0 h1:STMs1t5lYR5mR974PSiwNzE5TvsosByTp+rKXLOhAjE= +github.com/twmb/franz-go/pkg/kadm v1.16.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= +github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg= +github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= @@ -173,10 +203,20 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= +go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= +go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/mock v0.5.2 h1:LbtPTcP8A5k9WPXj54PPPbjcI4Y6lhyOZXn+VS7wNko= @@ -187,15 +227,13 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= -golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= +golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= +golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -204,18 +242,14 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= -golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= +golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -227,47 +261,47 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= -golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg= +golang.org/x/term v0.32.0/go.mod h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= -golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= +golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= -google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= -google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y= +google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb h1:p31xT4yrYrSM/G4Sn2+TNUkVhFCbG9y8itM2S6Th950= +google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:jbe3Bkdp+Dh2IrslsFCklNhweNTBgSYanP1UXhJDhKg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb h1:TLPQVbx1GJ8VKZxz52VAxl1EBgKXXbTiU9Fc5fZeLn4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= +google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= +google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= diff --git a/internal/processor/repository/flight_summary_test.go b/internal/processor/repository/flight_summary_test.go index ff0253e..e18d040 100644 --- a/internal/processor/repository/flight_summary_test.go +++ b/internal/processor/repository/flight_summary_test.go @@ -12,7 +12,11 @@ import ( "github.com/testcontainers/testcontainers-go/modules/mongodb" ) -func TestNewMongoSummaryRepository_ValidClient_ShouldSucceed(t *testing.T) { +func TestNewMongoSummaryRepository_Integration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + ctx := context.Background() // Start a MongoDB container @@ -53,7 +57,7 @@ func TestNewMongoSummaryRepository_NilClient_ShouldError(t *testing.T) { require.Nil(t, repo) } -func TestInsert__Integration(t *testing.T) { +func TestInsert_Integration(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") } diff --git a/internal/processor/service/processor.go b/internal/processor/service/processor.go index 8ac84c3..2778a34 100644 --- a/internal/processor/service/processor.go +++ b/internal/processor/service/processor.go @@ -9,7 +9,7 @@ import ( repo "github.com/ansoncht/flight-microservices/internal/processor/repository" msgQueue "github.com/ansoncht/flight-microservices/pkg/kafka" "github.com/ansoncht/flight-microservices/pkg/model" - "github.com/segmentio/kafka-go" + "github.com/twmb/franz-go/pkg/kgo" "golang.org/x/sync/errgroup" ) @@ -59,7 +59,7 @@ func NewProcessor( func (p *Processor) Process(ctx context.Context) error { flights := make([]model.FlightRecord, 0) - msgChan := make(chan kafka.Message) + msgChan := make(chan kgo.Record) airport := "" g, gCtx := errgroup.WithContext(ctx) diff --git a/internal/processor/service/processor_test.go b/internal/processor/service/processor_test.go index 6d17f63..6106a3d 100644 --- a/internal/processor/service/processor_test.go +++ b/internal/processor/service/processor_test.go @@ -13,8 +13,8 @@ import ( "github.com/ansoncht/flight-microservices/internal/test/mock" msgQueue "github.com/ansoncht/flight-microservices/pkg/kafka" msg "github.com/ansoncht/flight-microservices/pkg/model" - "github.com/segmentio/kafka-go" "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/mock/gomock" ) @@ -113,7 +113,7 @@ func TestProcess_ValidMessage_ShouldSuccess(t *testing.T) { require.NoError(t, err) require.NotNil(t, flight2) - messages := []kafka.Message{ + messages := []kgo.Record{ {Key: []byte("start_of_stream"), Value: []byte("JFK")}, {Key: []byte("flight"), Value: flight1}, {Key: []byte("flight"), Value: flight2}, @@ -121,7 +121,7 @@ func TestProcess_ValidMessage_ShouldSuccess(t *testing.T) { } reader.EXPECT().ReadMessages(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, msgChan chan<- kafka.Message) error { + func(_ context.Context, msgChan chan<- kgo.Record) error { defer close(msgChan) for _, msg := range messages { msgChan <- msg @@ -168,7 +168,7 @@ func TestProcess_MalformedMessage_ShouldSuccess(t *testing.T) { require.NoError(t, err) require.NotNil(t, flight1) - messages := []kafka.Message{ + messages := []kgo.Record{ {Key: []byte("start_of_stream"), Value: []byte("JFK")}, {Key: []byte("flight"), Value: flight1}, {Key: []byte("flight"), Value: []byte("malformed")}, @@ -176,7 +176,7 @@ func TestProcess_MalformedMessage_ShouldSuccess(t *testing.T) { } reader.EXPECT().ReadMessages(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, msgChan chan<- kafka.Message) error { + func(_ context.Context, msgChan chan<- kgo.Record) error { defer close(msgChan) for _, msg := range messages { msgChan <- msg @@ -220,13 +220,13 @@ func TestProcess_ContextCanceledWhenRead_ShouldError(t *testing.T) { require.NotNil(t, flight) reader.EXPECT().ReadMessages(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, msgChan chan<- kafka.Message) error { + func(ctx context.Context, msgChan chan<- kgo.Record) error { defer close(msgChan) select { case <-ctx.Done(): return ctx.Err() - case msgChan <- kafka.Message{Key: []byte("start_of_stream"), Value: []byte("JFK")}: + case msgChan <- kgo.Record{Key: []byte("start_of_stream"), Value: []byte("JFK")}: cancel() time.Sleep(10 * time.Millisecond) return ctx.Err() @@ -259,13 +259,13 @@ func TestProcess_ContextCanceled_ShouldError(t *testing.T) { require.NotNil(t, flight) reader.EXPECT().ReadMessages(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, msgChan chan<- kafka.Message) error { + func(ctx context.Context, msgChan chan<- kgo.Record) error { defer close(msgChan) select { case <-ctx.Done(): return ctx.Err() - case msgChan <- kafka.Message{Key: []byte("start_of_stream"), Value: []byte("JFK")}: + case msgChan <- kgo.Record{Key: []byte("start_of_stream"), Value: []byte("JFK")}: cancel() time.Sleep(10 * time.Millisecond) } @@ -296,7 +296,7 @@ func TestProcess_ReaderError_ShouldError(t *testing.T) { require.NotNil(t, processor) reader.EXPECT().ReadMessages(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, msgChan chan<- kafka.Message) error { + func(_ context.Context, msgChan chan<- kgo.Record) error { defer close(msgChan) return errors.New("test error") }, @@ -324,14 +324,14 @@ func TestProcess_SummarizeFlightsError_ShouldError(t *testing.T) { require.NoError(t, err) require.NotNil(t, flight) - messages := []kafka.Message{ + messages := []kgo.Record{ {Key: []byte("start_of_stream"), Value: []byte("JFK")}, {Key: []byte("flight"), Value: flight}, {Key: []byte("end_of_stream"), Value: []byte("2025-05-07")}, } reader.EXPECT().ReadMessages(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, msgChan chan<- kafka.Message) error { + func(_ context.Context, msgChan chan<- kgo.Record) error { defer close(msgChan) for _, msg := range messages { msgChan <- msg @@ -364,14 +364,14 @@ func TestProcessor_Process_RepositoryInsertError(t *testing.T) { require.NoError(t, err) require.NotNil(t, flight) - messages := []kafka.Message{ + messages := []kgo.Record{ {Key: []byte("start_of_stream"), Value: []byte("JFK")}, {Key: []byte("flight"), Value: flight}, {Key: []byte("end_of_stream"), Value: []byte("2025-05-07")}, } reader.EXPECT().ReadMessages(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, msgChan chan<- kafka.Message) error { + func(_ context.Context, msgChan chan<- kgo.Record) error { defer close(msgChan) for _, msg := range messages { msgChan <- msg @@ -415,14 +415,14 @@ func TestProcessor_Process_WriteMessageError(t *testing.T) { require.NoError(t, err) require.NotNil(t, flight) - messages := []kafka.Message{ + messages := []kgo.Record{ {Key: []byte("start_of_stream"), Value: []byte("JFK")}, {Key: []byte("flight"), Value: flight}, {Key: []byte("end_of_stream"), Value: []byte("2025-05-07")}, } reader.EXPECT().ReadMessages(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, msgChan chan<- kafka.Message) error { + func(_ context.Context, msgChan chan<- kgo.Record) error { defer close(msgChan) for _, msg := range messages { msgChan <- msg diff --git a/internal/reader/service/reader.go b/internal/reader/service/reader.go index c4a90fe..ed3569d 100644 --- a/internal/reader/service/reader.go +++ b/internal/reader/service/reader.go @@ -52,12 +52,8 @@ func NewReader( } // Close closes the reader service. -func (r *Reader) Close() error { - if err := r.messageWriter.Close(); err != nil { - return fmt.Errorf("failed to close message writer: %w", err) - } - - return nil +func (r *Reader) Close() { + r.messageWriter.Close() } func (r *Reader) HTTPHandler(w http.ResponseWriter, req *http.Request) { diff --git a/internal/reader/service/reader_test.go b/internal/reader/service/reader_test.go index 773a15a..0886ed8 100644 --- a/internal/reader/service/reader_test.go +++ b/internal/reader/service/reader_test.go @@ -304,28 +304,10 @@ func TestClose_ValidAction_ShouldSucceed(t *testing.T) { mKafka := mock.NewMockMessageWriter(ctrl) - mKafka.EXPECT().Close().Return(nil) + mKafka.EXPECT().Close().Return() reader, err := service.NewReader(&client.FlightAPI{}, &client.RouteAPI{}, mKafka) require.NoError(t, err) require.NotNil(t, reader) - - err = reader.Close() - require.NoError(t, err) -} - -func TestClose_MessageWriterError_ShouldError(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mKafka := mock.NewMockMessageWriter(ctrl) - - mKafka.EXPECT().Close().Return(errors.New("error")) - - reader, err := service.NewReader(&client.FlightAPI{}, &client.RouteAPI{}, mKafka) - require.NoError(t, err) - require.NotNil(t, reader) - - err = reader.Close() - require.ErrorContains(t, err, "failed to close message writer") + defer reader.Close() } diff --git a/internal/test/mock/mock_reader.go b/internal/test/mock/mock_kafka_reader.go similarity index 89% rename from internal/test/mock/mock_reader.go rename to internal/test/mock/mock_kafka_reader.go index a22bef7..74b55fa 100644 --- a/internal/test/mock/mock_reader.go +++ b/internal/test/mock/mock_kafka_reader.go @@ -3,7 +3,7 @@ // // Generated by this command: // -// mockgen -source pkg/kafka/reader.go -destination=internal/test/mock/mock_reader.go -package=mock +// mockgen -source pkg/kafka/reader.go -destination=internal/test/mock/mock_kafka_reader.go -package=mock // // Package mock is a generated GoMock package. @@ -13,7 +13,7 @@ import ( context "context" reflect "reflect" - kafka "github.com/segmentio/kafka-go" + kgo "github.com/twmb/franz-go/pkg/kgo" gomock "go.uber.org/mock/gomock" ) @@ -42,11 +42,9 @@ func (m *MockMessageReader) EXPECT() *MockMessageReaderMockRecorder { } // Close mocks base method. -func (m *MockMessageReader) Close() error { +func (m *MockMessageReader) Close() { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") - ret0, _ := ret[0].(error) - return ret0 + m.ctrl.Call(m, "Close") } // Close indicates an expected call of Close. @@ -56,7 +54,7 @@ func (mr *MockMessageReaderMockRecorder) Close() *gomock.Call { } // ReadMessages mocks base method. -func (m *MockMessageReader) ReadMessages(ctx context.Context, msgChan chan<- kafka.Message) error { +func (m *MockMessageReader) ReadMessages(ctx context.Context, msgChan chan<- kgo.Record) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReadMessages", ctx, msgChan) ret0, _ := ret[0].(error) diff --git a/internal/test/mock/mock_writer.go b/internal/test/mock/mock_kafka_writer.go similarity index 92% rename from internal/test/mock/mock_writer.go rename to internal/test/mock/mock_kafka_writer.go index 41dfdce..a0e0a83 100644 --- a/internal/test/mock/mock_writer.go +++ b/internal/test/mock/mock_kafka_writer.go @@ -3,7 +3,7 @@ // // Generated by this command: // -// mockgen -source pkg/kafka/writer.go -destination=internal/test/mock/mock_writer.go -package=mock +// mockgen -source pkg/kafka/writer.go -destination=internal/test/mock/mock_kafka_writer.go -package=mock // // Package mock is a generated GoMock package. @@ -41,11 +41,9 @@ func (m *MockMessageWriter) EXPECT() *MockMessageWriterMockRecorder { } // Close mocks base method. -func (m *MockMessageWriter) Close() error { +func (m *MockMessageWriter) Close() { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") - ret0, _ := ret[0].(error) - return ret0 + m.ctrl.Call(m, "Close") } // Close indicates an expected call of Close. diff --git a/pkg/kafka/reader.go b/pkg/kafka/reader.go index b8799c5..f6d541b 100644 --- a/pkg/kafka/reader.go +++ b/pkg/kafka/reader.go @@ -4,10 +4,14 @@ import ( "context" "errors" "fmt" - "io" "log/slog" + "time" - "github.com/segmentio/kafka-go" + "github.com/twmb/franz-go/pkg/kgo" +) + +const ( + pollInterval = 5 * time.Second ) // ReaderConfig holds configuration settings for the Kafka reader. @@ -23,15 +27,15 @@ type ReaderConfig struct { // MessageReader defines the interface for reading messages from a message queue. type MessageReader interface { // ReadMessages reads messages from the message queue. - ReadMessages(ctx context.Context, msgChan chan<- kafka.Message) error + ReadMessages(ctx context.Context, msgChan chan<- kgo.Record) error // Close closes the message queue reader. - Close() error + Close() } // Reader holds the Kafka reader instance. type Reader struct { - // KafkaReader specifies the kafka reader instance. - KafkaReader *kafka.Reader + // Client specifies the kafka client instance. + Client *kgo.Client } // NewKafkaReader creates a new Reader instance based on the provided configuration. @@ -55,29 +59,28 @@ func NewKafkaReader(cfg ReaderConfig) (*Reader, error) { return nil, fmt.Errorf("kafka group ID is empty") } + opts := []kgo.Opt{ + kgo.SeedBrokers([]string{cfg.Address}...), + kgo.ConsumerGroup(cfg.GroupID), + kgo.ConsumeTopics(cfg.Topic), + } + client, err := kgo.NewClient(opts...) + if err != nil { + return nil, fmt.Errorf("failed to create Kafka client: %w", err) + } + return &Reader{ - KafkaReader: kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{cfg.Address}, - Topic: cfg.Topic, - }), + Client: client, }, nil } // Close closes the Kafka reader. -func (r *Reader) Close() error { - if r == nil { - return nil - } - - if err := r.KafkaReader.Close(); err != nil { - return fmt.Errorf("failed to close Kafka reader: %w", err) - } - - return nil +func (r *Reader) Close() { + r.Client.Close() } // ReadMessages reads messages from the Kafka topic and sends them to the provided channel. -func (r *Reader) ReadMessages(ctx context.Context, msgChan chan<- kafka.Message) error { +func (r *Reader) ReadMessages(ctx context.Context, msgChan chan<- kgo.Record) error { slog.Info("Reading message from Kafka topic") defer close(msgChan) @@ -86,47 +89,50 @@ func (r *Reader) ReadMessages(ctx context.Context, msgChan chan<- kafka.Message) return fmt.Errorf("kafka reader is nil") } + // Poll every 5 seconds + pollTicker := time.NewTicker(pollInterval) + defer pollTicker.Stop() + readingLoop: for { select { case <-ctx.Done(): break readingLoop - default: - // Read a message from Kafka - message, err := r.KafkaReader.ReadMessage(ctx) - - if err != nil { - if errors.Is(err, io.EOF) { - slog.Info("Kafka reader reached EOF") - continue - } - if errors.Is(err, context.Canceled) { - slog.Info("Context canceled during ReadMessage") - break readingLoop + case <-pollTicker.C: + fetches := r.Client.PollFetches(ctx) + + if errs := fetches.Errors(); len(errs) > 0 { + for _, err := range errs { + if errors.Is(err.Err, context.Canceled) { + break readingLoop + } + + if err.Topic != "" || err.Partition != -1 || err.Err != nil { + slog.Error("Failed to fetch message from Kafka", "errors", err) + } } - return fmt.Errorf("failed to read message from Kafka: %w", err) } - // Send the message to the channel - select { - case msgChan <- message: - slog.Debug( - "Message sent to channel", - "topic", message.Topic, - "partition", message.Partition, - "offset", message.Offset, - "key", string(message.Key), - "value", string(message.Value), - ) - case <-ctx.Done(): - slog.Info("Context is done, stopping message reading") - break readingLoop - } + // Process fetched messages + fetches.EachPartition(func(p kgo.FetchTopicPartition) { + for _, record := range p.Records { + select { + case <-ctx.Done(): + return + case msgChan <- *record: + r.Client.MarkCommitRecords(record) + } + } + }) } } + if err := r.Client.CommitUncommittedOffsets(ctx); err != nil { + return fmt.Errorf("failed to commit offsets: %w", err) + } + if ctx.Err() != nil { - return fmt.Errorf("context canceled while reading kafka messages: %w", ctx.Err()) + return fmt.Errorf("context canceled while fetching kafka messages: %w", ctx.Err()) } return nil diff --git a/pkg/kafka/reader_test.go b/pkg/kafka/reader_test.go index c392d3a..abb0669 100644 --- a/pkg/kafka/reader_test.go +++ b/pkg/kafka/reader_test.go @@ -7,9 +7,10 @@ import ( "time" msgQueue "github.com/ansoncht/flight-microservices/pkg/kafka" - kafkago "github.com/segmentio/kafka-go" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/modules/kafka" + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" ) const ( @@ -19,7 +20,7 @@ const ( testGroupID = "test-group" testMessageKey = "test-key" testMessageValue = "hello, world" - readTimeout = 10 * time.Second + timeout = 10 * time.Second ) // setupKafkaTest spins up a Kafka container for testing and returns its brokers and a cleanup function. @@ -42,12 +43,20 @@ func setupKafkaTest(ctx context.Context, t *testing.T) (brokers []string, cleanu require.NoError(t, err) require.NotEmpty(t, brokers) - conn, err := kafkago.DialLeader(context.Background(), "tcp", brokers[0], testTopic, 0) + client, err := kgo.NewClient( + kgo.SeedBrokers(brokers...), + ) require.NoError(t, err) - defer func() { - err := conn.Close() - require.NoError(t, err) - }() + defer client.Close() + + admin := kadm.NewClient(client) + + ctxTimeout, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + createTopicResponse, err := admin.CreateTopic(ctxTimeout, -1, -1, nil, testTopic) + require.NoError(t, err) + require.NotNil(t, createTopicResponse) return brokers, cleanup } @@ -123,43 +132,34 @@ func TestReadMessages_Integration(t *testing.T) { defer cleanup() brokerAddress := brokers[0] - writer := &kafkago.Writer{ - Addr: kafkago.TCP(brokerAddress), - Topic: testTopic, - RequiredAcks: kafkago.RequireOne, - Async: false, - BatchTimeout: 100 * time.Millisecond, + + wCfg := msgQueue.WriterConfig{ + Address: brokerAddress, + Topic: testTopic, } - defer func() { - err := writer.Close() - require.NoError(t, err) - }() + writer, err := msgQueue.NewKafkaWriter(wCfg) + require.NoError(t, err) + require.NotNil(t, writer) + defer writer.Close() writeCtx, writeCancel := context.WithTimeout(ctx, 5*time.Second) defer writeCancel() - testMsg := kafkago.Message{ - Key: []byte(testMessageKey), - Value: []byte(testMessageValue), - } - err := writer.WriteMessages(writeCtx, testMsg) + err = writer.WriteMessage(writeCtx, []byte(testMessageKey), []byte(testMessageValue)) require.NoError(t, err) - cfg := msgQueue.ReaderConfig{ + rCfg := msgQueue.ReaderConfig{ Address: brokerAddress, Topic: testTopic, GroupID: testGroupID, } - reader, err := msgQueue.NewKafkaReader(cfg) + reader, err := msgQueue.NewKafkaReader(rCfg) require.NoError(t, err) require.NotNil(t, reader) - defer func() { - err := reader.Close() - require.NoError(t, err) - }() + defer reader.Close() t.Run("Successful ReadMessages", func(t *testing.T) { - msgChan := make(chan kafkago.Message, 1) + msgChan := make(chan kgo.Record, 1) readErrChan := make(chan error, 1) readCtx, readCancel := context.WithCancel(ctx) defer readCancel() @@ -170,11 +170,11 @@ func TestReadMessages_Integration(t *testing.T) { select { case msg := <-msgChan: - require.Equal(t, testMsg.Key, msg.Key) - require.Equal(t, testMsg.Value, msg.Value) + require.Equal(t, testMessageKey, string(msg.Key)) + require.Equal(t, testMessageValue, string(msg.Value)) case err := <-readErrChan: require.NoError(t, err) - case <-time.After(readTimeout): + case <-time.After(timeout): t.Fatal("timed out waiting for message from Kafka") } @@ -190,7 +190,7 @@ func TestReadMessages_Integration(t *testing.T) { }) t.Run("Nil Reader", func(t *testing.T) { - msgChan := make(chan kafkago.Message, 1) + msgChan := make(chan kgo.Record, 1) readCtx, readCancel := context.WithCancel(ctx) defer readCancel() @@ -203,38 +203,8 @@ func TestReadMessages_Integration(t *testing.T) { cancelCtx, cancel := context.WithCancel(context.Background()) cancel() - msgChan := make(chan kafkago.Message) + msgChan := make(chan kgo.Record) err = reader.ReadMessages(cancelCtx, msgChan) require.ErrorIs(t, err, context.Canceled) }) } - -func TestReaderClose_Integration(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test") - } - - ctx := context.Background() - brokers, cleanup := setupKafkaTest(ctx, t) - defer cleanup() - - t.Run("Successful Close", func(t *testing.T) { - cfg := msgQueue.ReaderConfig{ - Address: brokers[0], - Topic: testTopic, - GroupID: testGroupID, - } - reader, err := msgQueue.NewKafkaReader(cfg) - require.NoError(t, err) - require.NotNil(t, reader) - - err = reader.Close() - require.NoError(t, err) - }) - - t.Run("Nil Reader", func(t *testing.T) { - var reader *msgQueue.Reader - err := reader.Close() - require.NoError(t, err) - }) -} diff --git a/pkg/kafka/writer.go b/pkg/kafka/writer.go index 122e7be..f76968d 100644 --- a/pkg/kafka/writer.go +++ b/pkg/kafka/writer.go @@ -5,7 +5,7 @@ import ( "fmt" "log/slog" - "github.com/segmentio/kafka-go" + "github.com/twmb/franz-go/pkg/kgo" ) // WriterConfig holds configuration settings for the Kafka writer. @@ -21,14 +21,14 @@ type MessageWriter interface { // WriteMessage writes a message to the message queue. WriteMessage(ctx context.Context, key []byte, value []byte) error // Close closes the message queue writer. - Close() error + Close() } // Writer holds the Kafka writer instance. // It implements the MessageWriter interface to provide methods for writing messages to Kafka. type Writer struct { - // KafkaWriter specifies the kafka writer instance. - KafkaWriter *kafka.Writer + // Client specifies the kafka reader instance. + Client *kgo.Client } // NewKafkaWriter creates a new Writer instance with the provided configuration. @@ -43,25 +43,23 @@ func NewKafkaWriter(cfg WriterConfig) (*Writer, error) { return nil, fmt.Errorf("kafka topic is empty") } + opts := []kgo.Opt{ + kgo.SeedBrokers([]string{cfg.Address}...), + kgo.DefaultProduceTopic(cfg.Topic), + } + client, err := kgo.NewClient(opts...) + if err != nil { + return nil, fmt.Errorf("failed to create Kafka client: %w", err) + } + return &Writer{ - KafkaWriter: &kafka.Writer{ - Addr: kafka.TCP(cfg.Address), - Topic: cfg.Topic, - }, + Client: client, }, nil } // Close closes the Kafka writer. -func (w *Writer) Close() error { - if w == nil { - return nil - } - - if err := w.KafkaWriter.Close(); err != nil { - return fmt.Errorf("failed to close Kafka writer: %w", err) - } - - return nil +func (w *Writer) Close() { + w.Client.Close() } // WriteMessage writes a message to the Kafka topic. @@ -80,14 +78,27 @@ func (w *Writer) WriteMessage(ctx context.Context, key []byte, value []byte) err return fmt.Errorf("message value is nil or empty") } - msg := kafka.Message{ + record := &kgo.Record{ Key: key, Value: value, } - if err := w.KafkaWriter.WriteMessages(ctx, msg); err != nil { - return fmt.Errorf("failed to write message to Kafka topic %s: %w", w.KafkaWriter.Topic, err) - } + errChan := make(chan error, 1) - return nil + w.Client.Produce(ctx, record, func(_ *kgo.Record, err error) { + if err != nil { + errChan <- err + } else { + errChan <- nil + } + + close(errChan) + }) + + select { + case err := <-errChan: + return err + case <-ctx.Done(): + return fmt.Errorf("context canceled while producing message: %w", ctx.Err()) + } } diff --git a/pkg/kafka/writer_test.go b/pkg/kafka/writer_test.go index 26d8ae8..45d4a66 100644 --- a/pkg/kafka/writer_test.go +++ b/pkg/kafka/writer_test.go @@ -6,8 +6,8 @@ import ( "time" msgQueue "github.com/ansoncht/flight-microservices/pkg/kafka" - kafkago "github.com/segmentio/kafka-go" "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" ) func TestNewKafkaWriter_ValidConfig_ShouldSucceed(t *testing.T) { @@ -70,19 +70,15 @@ func TestWriteMessage_Integration(t *testing.T) { defer cleanup() brokerAddress := brokers[0] - writer := &msgQueue.Writer{ - KafkaWriter: &kafkago.Writer{ - Addr: kafkago.TCP(brokerAddress), - Topic: testTopic, - RequiredAcks: kafkago.RequireOne, - Async: false, - BatchTimeout: 100 * time.Millisecond, - }, + + cfg := msgQueue.WriterConfig{ + Address: brokerAddress, + Topic: testTopic, } - defer func() { - err := writer.Close() - require.NoError(t, err) - }() + writer, err := msgQueue.NewKafkaWriter(cfg) + require.NoError(t, err) + require.NotNil(t, writer) + defer writer.Close() t.Run("Successful WriteMessage", func(t *testing.T) { writeCtx, writeCancel := context.WithTimeout(ctx, 5*time.Second) @@ -99,14 +95,11 @@ func TestWriteMessage_Integration(t *testing.T) { reader, err := msgQueue.NewKafkaReader(cfg) require.NoError(t, err) require.NotNil(t, reader) - defer func() { - err := reader.Close() - require.NoError(t, err) - }() + defer reader.Close() - msgChan := make(chan kafkago.Message, 1) + msgChan := make(chan kgo.Record, 1) readErrChan := make(chan error, 1) - readCtx, readCancel := context.WithTimeout(ctx, readTimeout) + readCtx, readCancel := context.WithTimeout(ctx, timeout) defer readCancel() go func() { @@ -148,32 +141,3 @@ func TestWriteMessage_Integration(t *testing.T) { require.ErrorIs(t, err, context.Canceled) }) } - -func TestWriterClose_Integration(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test") - } - - ctx := context.Background() - brokers, cleanup := setupKafkaTest(ctx, t) - defer cleanup() - - t.Run("Successful Close", func(t *testing.T) { - cfg := msgQueue.WriterConfig{ - Address: brokers[0], - Topic: testTopic, - } - writer, err := msgQueue.NewKafkaWriter(cfg) - require.NoError(t, err) - require.NotNil(t, writer) - - err = writer.Close() - require.NoError(t, err) - }) - - t.Run("Nil Reader", func(t *testing.T) { - var writer *msgQueue.Writer - err := writer.Close() - require.NoError(t, err) - }) -} diff --git a/pkg/mongo/client_test.go b/pkg/mongo/client_test.go index 62efe9c..6f13802 100644 --- a/pkg/mongo/client_test.go +++ b/pkg/mongo/client_test.go @@ -10,7 +10,11 @@ import ( "github.com/testcontainers/testcontainers-go/modules/mongodb" ) -func TestNewMongoClient_ValidConfig_ShouldSucceed(t *testing.T) { +func TestNewMongoClient_Integration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + ctx := context.Background() // Start a MongoDB container