diff --git a/cmd/flyscrape/main.go b/cmd/flyscrape/main.go index 7419f6e..e352163 100644 --- a/cmd/flyscrape/main.go +++ b/cmd/flyscrape/main.go @@ -11,6 +11,7 @@ import ( "os" "github.com/philippta/flyscrape/cmd" + _ "github.com/philippta/flyscrape/modules/browser" _ "github.com/philippta/flyscrape/modules/cache" _ "github.com/philippta/flyscrape/modules/cookies" @@ -19,6 +20,7 @@ import ( _ "github.com/philippta/flyscrape/modules/followlinks" _ "github.com/philippta/flyscrape/modules/headers" _ "github.com/philippta/flyscrape/modules/output/json" + _ "github.com/philippta/flyscrape/modules/output/mongodb" _ "github.com/philippta/flyscrape/modules/output/ndjson" _ "github.com/philippta/flyscrape/modules/proxy" _ "github.com/philippta/flyscrape/modules/ratelimit" diff --git a/examples/coinmarketcap_mongodb.js b/examples/coinmarketcap_mongodb.js new file mode 100644 index 0000000..d515e0e --- /dev/null +++ b/examples/coinmarketcap_mongodb.js @@ -0,0 +1,20 @@ +export const config = { + url: "https://coinmarketcap.com/", + follow: ["a[href]"], + depth: 1, + output: { + mongodb: { + uri: "mongodb://localhost:27017", + database: "test", + collection: "coinmarketcap", + }, + }, +}; + +export default function ({ doc }) { + const title = doc.find("title"); + + return { + title: title.text(), + }; +} diff --git a/go.mod b/go.mod index 838ad5f..3e400bf 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,16 @@ require ( golang.org/x/sync v0.9.0 ) +require ( + github.com/golang/snappy v0.0.4 // indirect + github.com/klauspost/compress v1.16.7 // indirect + github.com/montanaflynn/stats v0.7.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect +) + require ( github.com/Velocidex/json v0.0.0-20220224052537-92f3c0326e5a // indirect github.com/Velocidex/ordereddict v0.0.0-20230909174157-2aa49cc5d11d // indirect @@ -48,6 +58,7 @@ require ( github.com/ysmood/gson v0.7.3 // indirect github.com/ysmood/leakless v0.8.0 // indirect github.com/zalando/go-keyring v0.2.5 // indirect + go.mongodb.org/mongo-driver v1.17.2 golang.org/x/crypto v0.29.0 // indirect golang.org/x/net v0.31.0 // indirect golang.org/x/sys v0.27.0 // indirect diff --git a/go.sum b/go.sum index 6e74da2..14f71fd 100644 --- a/go.sum +++ b/go.sum @@ -54,8 +54,12 @@ github.com/go-sqlite/sqlite3 v0.0.0-20180313105335-53dd8e640ee7 h1:ow5vK9Q/DSKkx github.com/go-sqlite/sqlite3 v0.0.0-20180313105335-53dd8e640ee7/go.mod h1:JxSQ+SvsjFb+p8Y+bn+GhTkiMfKVGBD0fq43ms2xw04= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gonuts/binary v0.2.0 h1:caITwMWAoQWlL0RNvv2lTU/AHqAJlVuu6nZmNgfbKW4= github.com/gonuts/binary v0.2.0/go.mod h1:kM+CtBrCGDSKdv8WXTuCUsw+loiy8f/QEI8YCCC0M/E= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20230207041349-798e818bf904 h1:4/hN5RUoecvl+RmJRE2YxKWtnnQls6rQjjW5oV7qg2U= github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg= github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= @@ -63,6 +67,8 @@ github.com/inancgumus/screen v0.0.0-20190314163918-06e984b86ed3 h1:fO9A67/izFYFY github.com/inancgumus/screen v0.0.0-20190314163918-06e984b86ed3/go.mod h1:Ey4uAp+LvIl+s5jRbOHLcZpUDnkjLBROl15fZLwPlTM= github.com/keybase/go-keychain v0.0.0-20231219164618-57a3676c3af6 h1:IsMZxCuZqKuao2vNdfD82fjjgPLfyHLpR41Z88viRWs= github.com/keybase/go-keychain v0.0.0-20231219164618-57a3676c3af6/go.mod h1:3VeWNIJaW+O5xpRQbPp0Ybqu1vJd/pm7s2F473HRrkw= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= @@ -73,6 +79,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/nlnwa/whatwg-url v0.4.0 h1:B3kFb5EL7KILeBkhrlQvFi41Ex0p4ropVA9brt5ungI= github.com/nlnwa/whatwg-url v0.4.0/go.mod h1:pLzpJjFPtA+n7RCLvp0GBxvDHa/2ckNCBK9mfEeNOMQ= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -106,6 +114,14 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/ysmood/fetchup v0.2.3 h1:ulX+SonA0Vma5zUFXtv52Kzip/xe7aj4vqT5AJwQ+ZQ= github.com/ysmood/fetchup v0.2.3/go.mod h1:xhibcRKziSvol0H1/pj33dnKrYyI2ebIvz5cOOkYGns= github.com/ysmood/goob v0.4.0 h1:HsxXhyLBeGzWXnqVKtmT9qM7EuVs/XOgkX7T6r1o1AQ= @@ -125,6 +141,8 @@ github.com/zalando/go-keyring v0.2.5 h1:Bc2HHpjALryKD62ppdEzaFG6VxL6Bc+5v0LYpN8L github.com/zalando/go-keyring v0.2.5/go.mod h1:HL4k+OXQfJUWaMnqyuSOc0drfGPX2b51Du6K+MRgZMk= go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0= go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= +go.mongodb.org/mongo-driver v1.17.2 h1:gvZyk8352qSfzyZ2UMWcpDpMSGEr1eqE4T793SqyhzM= +go.mongodb.org/mongo-driver v1.17.2/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= diff --git a/modules/output/mongodb/mongodb.go b/modules/output/mongodb/mongodb.go new file mode 100644 index 0000000..3ed7c9e --- /dev/null +++ b/modules/output/mongodb/mongodb.go @@ -0,0 +1,231 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package mongodb + +import ( + "context" + "errors" + "log" + "os" + "sync" + "time" + + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" + + "github.com/philippta/flyscrape" +) + +var ( + DefaultMaxPoolSize = 100 + DefaultBatchSize = 100 + DefaultFlushInterval = 10 * time.Second + DefaultTimeout = 30 * time.Second + DefaultMaxRetries = 3 +) + +func init() { + flyscrape.RegisterModule(&Module{}) +} + +type Module struct { + SourceId string `json:"sourceId"` + Output struct { + MongoDB struct { + URI string `json:"uri"` + Database string `json:"database"` + Collection string `json:"collection"` + MaxPoolSize int `json:"maxPoolSize,omitempty"` + } `json:"mongodb"` + } `json:"output"` + Concurrency int `json:"concurrency"` + + client *mongo.Client + collection *mongo.Collection + maxPoolSize int + + buf []interface{} + mu *sync.Mutex + + ticker *time.Ticker + done chan struct{} + concurrency chan struct{} +} + +func (m *Module) ModuleInfo() flyscrape.ModuleInfo { + return flyscrape.ModuleInfo{ + ID: "output.mongodb", + New: func() flyscrape.Module { return new(Module) }, + } +} + +func (m *Module) Provision(ctx flyscrape.Context) { + if m.disabled() { + return + } + + m.mu = &sync.Mutex{} + + m.maxPoolSize = DefaultMaxPoolSize + if m.Output.MongoDB.MaxPoolSize != 0 { + m.maxPoolSize = m.Output.MongoDB.MaxPoolSize + } + + if m.concurrencyEnabled() { + m.concurrency = make(chan struct{}, m.Concurrency) + for i := 0; i < m.Concurrency; i++ { + m.concurrency <- struct{}{} + } + } + + ctxTimeout, cancel := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancel() + client, err := mongo.Connect(ctxTimeout, options.Client().ApplyURI(m.Output.MongoDB.URI).SetMaxPoolSize(uint64(m.maxPoolSize))) + + if err != nil { + log.Printf("failed to connect to MongoDB: %v", err) + os.Exit(1) + } + + if err := client.Ping(ctxTimeout, readpref.Primary()); err != nil { + log.Printf("failed to ping MongoDB: %v", err) + os.Exit(1) + } + + m.client = client + m.collection = client.Database(m.Output.MongoDB.Database).Collection(m.Output.MongoDB.Collection) + m.buf = make([]interface{}, 0, DefaultBatchSize) + m.done = make(chan struct{}) + m.ticker = time.NewTicker(DefaultFlushInterval) + + go func() { + for { + select { + case <-m.ticker.C: + m.flushBuffer() + case <-m.done: + return + } + } + }() +} + +func (m *Module) ReceiveResponse(resp *flyscrape.Response) { + if m.disabled() { + return + } + + if resp.Data == nil && resp.Error == nil { + return + } + + o := output{ + URL: resp.Request.URL, + Data: resp.Data, + Timestamp: time.Now(), + } + if resp.Error != nil { + o.Error = resp.Error.Error() + } + + m.mu.Lock() + defer m.mu.Unlock() + + m.buf = append(m.buf, o) + if len(m.buf) >= DefaultBatchSize { + go m.flushBuffer() + } +} + +func (m *Module) Finalize() { + if m.disabled() { + return + } + + m.ticker.Stop() + + ctxTimeout, cancel := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancel() + + done := make(chan struct{}) + go func() { + m.flushBuffer() + close(done) + }() + select { + case <-done: + case <-ctxTimeout.Done(): + } + + if err := m.client.Disconnect(ctxTimeout); err != nil { + log.Printf("failed to disconnect from MongoDB: %v", err) + } + + close(m.done) +} + +func (m *Module) disabled() bool { + return m.Output.MongoDB.URI == "" || m.Output.MongoDB.Database == "" || m.Output.MongoDB.Collection == "" +} + +type output struct { + URL string `json:"url,omitempty"` + Data any `json:"data,omitempty"` + Error string `json:"error,omitempty"` + Timestamp time.Time `json:"timestamp,omitempty"` +} + +func (m *Module) flushBuffer() { + if m.concurrencyEnabled() { + <-m.concurrency + defer func() { m.concurrency <- struct{}{} }() + } + m.mu.Lock() + defer m.mu.Unlock() + + if len(m.buf) == 0 { + return + } + + var err error + var res *mongo.InsertManyResult + + for i := 0; i < DefaultMaxRetries; i++ { + ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancel() + + log.Printf("attempt %d to insert %d documents to MongoDB", i+1, len(m.buf)) + res, err = m.collection.InsertMany(ctx, m.buf) + if err == nil { + log.Printf("successfully inserted %d documents to MongoDB", len(res.InsertedIDs)) + m.buf = m.buf[:0] + return + } + + if errors.Is(err, context.DeadlineExceeded) { + log.Printf("operation timed out, retrying...") + continue + } + + log.Printf("failed to insert documents to MongoDB: %v", err) + break + } + + if err != nil { + log.Printf("failed to insert %d documents after %d retries: %v", len(m.buf), DefaultMaxRetries, err) + } + m.buf = m.buf[:0] +} + +func (m *Module) concurrencyEnabled() bool { + return m.Concurrency > 0 +} + +var ( + _ flyscrape.Provisioner = (*Module)(nil) + _ flyscrape.ResponseReceiver = (*Module)(nil) + _ flyscrape.Finalizer = (*Module)(nil) +)