diff --git a/src/DesignTime/DesignTime.fsproj b/src/DesignTime/DesignTime.fsproj index 7a21886..0aca126 100644 --- a/src/DesignTime/DesignTime.fsproj +++ b/src/DesignTime/DesignTime.fsproj @@ -10,8 +10,7 @@ true true true - true - 1182 + false preview diff --git a/src/DesignTime/NpgsqlConnectionProvider.fs b/src/DesignTime/NpgsqlConnectionProvider.fs index 5cead74..1538f8b 100644 --- a/src/DesignTime/NpgsqlConnectionProvider.fs +++ b/src/DesignTime/NpgsqlConnectionProvider.fs @@ -11,14 +11,15 @@ open System.Collections.Concurrent open System.Reflection let mutable cacheInstanceCount = 0 -let methodsCache = ConcurrentDictionary () +let methodsCache = ConcurrentDictionary () let typeCache = ConcurrentDictionary () let schemaCache = ConcurrentDictionary () let addCreateCommandMethod(connectionString, rootType: ProvidedTypeDefinition, commands: ProvidedTypeDefinition, customTypes: Map, dbSchemaLookups: DbSchemaLookups, globalXCtor, globalPrepare: bool, - providedTypeReuse, methodTypes, globalCollectionType: CollectionType, globalCommandTimeout : int) = + providedTypeReuse, methodTypes, globalCollectionType: CollectionType, globalCommandTimeout: int, + globalTries: int, globalRetryWaitTime: int, globalAsyncChoice: bool) = let staticParams = [ @@ -31,21 +32,23 @@ let addCreateCommandMethod(connectionString, rootType: ProvidedTypeDefinition, if not globalXCtor then yield ProvidedStaticParameter("XCtor", typeof, false) yield ProvidedStaticParameter("Prepare", typeof, globalPrepare) yield ProvidedStaticParameter("CommandTimeout", typeof, globalCommandTimeout) + yield ProvidedStaticParameter("Tries", typeof, globalTries) + yield ProvidedStaticParameter("RetryWaitTime", typeof, globalRetryWaitTime) ] let m = ProvidedMethod("CreateCommand", [], typeof, isStatic = true) m.DefineStaticParameters(staticParams, (fun methodName args -> - let sqlStatement, resultType, collectionType, singleRow, allParametersOptional, typename, xctor, (prepare: bool), (commandTimeout : int) = + let sqlStatement, resultType, collectionType, singleRow, allParametersOptional, typename, xctor, (prepare: bool), (commandTimeout: int), (tries: int), (retryWaitTime: int) = if not globalXCtor then - args.[0] :?> _ , args.[1] :?> _, args.[2] :?> _, args.[3] :?> _, args.[4] :?> _, args.[5] :?> _, args.[6] :?> _, args.[7] :?> _, args.[8] :?> _ + args.[0] :?> _ , args.[1] :?> _, args.[2] :?> _, args.[3] :?> _, args.[4] :?> _, args.[5] :?> _, args.[6] :?> _, args.[7] :?> _, args.[8] :?> _, args.[9] :?> _, args.[10] :?> _ else - args.[0] :?> _ , args.[1] :?> _, args.[2] :?> _, args.[3] :?> _, args.[4] :?> _, args.[5] :?> _, true, args.[6] :?> _, args.[7] :?> _ + args.[0] :?> _ , args.[1] :?> _, args.[2] :?> _, args.[3] :?> _, args.[4] :?> _, args.[5] :?> _, true, args.[6] :?> _, args.[7] :?> _, args.[8] :?> _, args.[9] :?> _ //let methodName = Regex.Replace(methodName, @"\s+", " ", RegexOptions.Multiline).Replace("\"", "").Replace("@", ":").Replace("CreateCommand,CommandText=", "").Trim() let commandTypeName = if typename <> "" then typename else methodName methodsCache.GetOrAdd( - commandTypeName, + (commandTypeName, globalAsyncChoice), fun _ -> if singleRow && not (resultType = ResultType.Records || resultType = ResultType.Tuples) then invalidArg "SingleRow" "SingleRow can be set only for ResultType.Records or ResultType.Tuples." @@ -66,13 +69,15 @@ let addCreateCommandMethod(connectionString, rootType: ProvidedTypeDefinition, collectionType, singleRow, (if statements.Length > 1 then (i + 1).ToString () else ""), - providedTypeReuse)) + providedTypeReuse, + globalAsyncChoice)) let cmdProvidedType = ProvidedTypeDefinition (commandTypeName, Some typeof, hideObjectMethods = true) commands.AddMember cmdProvidedType QuotationsFactory.AddTopLevelTypes cmdProvidedType parameters resultType methodTypes customTypes statements (if resultType <> ResultType.Records || providedTypeReuse = NoReuse then cmdProvidedType else rootType) + globalAsyncChoice let designTimeConfig = Expr.Lambda (Var ("x", typeof), @@ -85,6 +90,8 @@ let addCreateCommandMethod(connectionString, rootType: ProvidedTypeDefinition, QuotationsFactory.BuildDataColumnsExpr (statements, resultType <> ResultType.DataTable) Expr.Value prepare Expr.Value commandTimeout + Expr.Value tries + Expr.Value retryWaitTime ])) let method = QuotationsFactory.GetCommandFactoryMethod (cmdProvidedType, designTimeConfig, xctor, commandTypeName) @@ -160,7 +167,7 @@ let createTableTypes(customTypes : Map, item: Db tables -let createRootType (assembly, nameSpace: string, typeName, connectionString, xctor, prepare, reuseProvidedTypes, methodTypes, collectionType, commandTimeout) = +let createRootType (assembly, nameSpace: string, typeName, connectionString, xctor, prepare, reuseProvidedTypes, methodTypes, collectionType, commandTimeout, tries, retryWaitTime, asyncChoice) = if String.IsNullOrWhiteSpace connectionString then invalidArg "Connection" "Value is empty!" let databaseRootType = ProvidedTypeDefinition (assembly, nameSpace, typeName, baseType = Some typeof, hideObjectMethods = true) @@ -191,7 +198,7 @@ let createRootType (assembly, nameSpace: string, typeName, connectionString, xct let commands = ProvidedTypeDefinition("Commands", None) databaseRootType.AddMember commands let providedTypeReuse = if reuseProvidedTypes then WithCache typeCache else NoReuse - addCreateCommandMethod (connectionString, databaseRootType, commands, customTypes, schemaLookups, xctor, prepare, providedTypeReuse, methodTypes, collectionType, commandTimeout) + addCreateCommandMethod (connectionString, databaseRootType, commands, customTypes, schemaLookups, xctor, prepare, providedTypeReuse, methodTypes, collectionType, commandTimeout, tries, retryWaitTime, asyncChoice) databaseRootType @@ -208,8 +215,11 @@ let internal getProviderType (assembly, nameSpace) = ProvidedStaticParameter("MethodTypes", typeof, MethodTypes.Sync ||| MethodTypes.Async) ProvidedStaticParameter("CollectionType", typeof, CollectionType.List) ProvidedStaticParameter("CommandTimeout", typeof, 0) + ProvidedStaticParameter("Tries", typeof, 1) + ProvidedStaticParameter("RetryWaitTime", typeof, 1000) + ProvidedStaticParameter("AsyncChoice", typeof, false) ], - fun typeName args -> typeCache.GetOrAdd (typeName, fun typeName -> createRootType (assembly, nameSpace, typeName, unbox args.[0], unbox args.[1], unbox args.[2], unbox args.[3], unbox args.[4], unbox args.[5], unbox args.[6]))) + fun typeName args -> typeCache.GetOrAdd (typeName, fun typeName -> createRootType (assembly, nameSpace, typeName, unbox args.[0], unbox args.[1], unbox args.[2], unbox args.[3], unbox args.[4], unbox args.[5], unbox args.[6], unbox args.[7], unbox args.[8], unbox args.[9]))) providerType.AddXmlDoc """ Typed access to PostgreSQL programmable objects, tables and functions. @@ -220,6 +230,9 @@ let internal getProviderType (assembly, nameSpace) = Indicates whether to generate Execute, AsyncExecute or both methods for commands. Indicates whether rows should be returned in a list, array or ResizeArray. The time to wait (in seconds) while trying to execute a command before terminating the attempt and generating an error. Set to zero for infinity. +The number of attempts alotted for a database operation. Set to 0 for infinity. +The time to wait (in milliseconds) while waiting to retry a databased operation before terminating the attempt and generating an error. Set to zero for infinity. +Whether Async functions perform Async.Catch implcity and return Choice<'a, Exception> rather than 'a. """ providerType diff --git a/src/DesignTime/QuotationsFactory.fs b/src/DesignTime/QuotationsFactory.fs index bb1b791..30851bb 100644 --- a/src/DesignTime/QuotationsFactory.fs +++ b/src/DesignTime/QuotationsFactory.fs @@ -5,7 +5,9 @@ open System.Data open System.Reflection open FSharp.Quotations open ProviderImplementation.ProvidedTypes +open ProviderImplementation.ProvidedTypes.UncheckedQuotations open Npgsql +open FSharp open FSharp.Data.Npgsql open InformationSchema open System.Collections.Concurrent @@ -14,6 +16,7 @@ open System.Threading.Tasks type internal ReturnType = { Single: Type RowProvidedType: Type option + AsyncChoice: bool } type internal Statement = { @@ -82,7 +85,8 @@ type internal QuotationsFactory () = static member GetMapperFromOptionToObj (t: Type, value: Expr) = Expr.Call (typeof.GetMethod(nameof Utils.OptionToObj).MakeGenericMethod t, [ Expr.Coerce (value, typeof) ]) - static member AddGeneratedMethod (sqlParameters: Parameter list, executeArgs: ProvidedParameter list, erasedType, providedOutputType, name) = + static member AddGeneratedMethod (sqlParameters: Parameter list, executeArgs: ProvidedParameter list, erasedType, providedOutputType: Type, asyncChoice, name) = + let mappedInputParamValues (exprArgs: Expr list) = (exprArgs.Tail, sqlParameters) ||> List.map2 (fun expr param -> @@ -105,9 +109,21 @@ type internal QuotationsFactory () = let invokeCode exprArgs = let vals = mappedInputParamValues exprArgs let paramValues = if vals.IsEmpty then QuotationsFactory.ParamArrayEmptyExpr else Expr.NewArray (typeof, vals) - Expr.Call (Expr.Coerce (exprArgs.[0], erasedType), typeof.GetMethod name, [ paramValues ]) - - ProvidedMethod(name, executeArgs, providedOutputType, invokeCode) + let callWithoutChoice = Expr.Call (Expr.Coerce (exprArgs.[0], erasedType), typeof.GetMethod name, [ paramValues ]) + if asyncChoice && + providedOutputType.Name = (async { return () }).GetType().Name then + Expr.CallUnchecked (ProvidedTypeBuilder.MakeGenericMethod (typeof.GetMethod "Catch", [providedOutputType.GenericTypeArguments.[0]]), [callWithoutChoice]) + else callWithoutChoice + + let outputType = + if asyncChoice && + providedOutputType.Name = (async { return () }).GetType().Name then + let choiceType = ProvidedTypeBuilder.MakeGenericType (typedefof>, [providedOutputType.GenericTypeArguments.[0]; typeof]) + let asyncType = ProvidedTypeBuilder.MakeGenericType ((async { return () }).GetType().GetGenericTypeDefinition(), [choiceType]) + asyncType + else providedOutputType + + ProvidedMethod(name, executeArgs, outputType, invokeCode) static member GetRecordType (rootTypeName, columns: Column list, customTypes: Map, typeNameSuffix, providedTypeReuse) = columns @@ -321,14 +337,14 @@ type internal QuotationsFactory () = tableType - static member GetOutputTypes (rootTypeName, sql, statementType, customTypes: Map, resultType, collectionType, singleRow, typeNameSuffix, providedTypeReuse) = + static member GetOutputTypes (rootTypeName, sql, statementType, customTypes: Map, resultType, collectionType, singleRow, typeNameSuffix, providedTypeReuse, asyncChoice) = let returnType = match resultType, statementType with | ResultType.DataReader, _ | _, Control -> None | _, NonQuery -> - Some { Single = typeof; RowProvidedType = None } + Some { Single = typeof; RowProvidedType = None; AsyncChoice = asyncChoice } | ResultType.DataTable, Query columns -> let dataRowType = QuotationsFactory.GetDataRowType (customTypes, columns) let dataTableType = @@ -340,7 +356,7 @@ type internal QuotationsFactory () = dataTableType.AddMember dataRowType - Some { Single = dataTableType; RowProvidedType = None } + Some { Single = dataTableType; RowProvidedType = None; AsyncChoice = asyncChoice } | _, Query columns -> let providedRowType = if List.length columns = 1 then @@ -364,7 +380,8 @@ type internal QuotationsFactory () = ProvidedTypeBuilder.MakeGenericType (typedefof>, [ providedRowType ]) else ProvidedTypeBuilder.MakeGenericType (typedefof<_ list>, [ providedRowType ]) - RowProvidedType = Some providedRowType } + RowProvidedType = Some providedRowType + AsyncChoice = asyncChoice } { Type = statementType; Sql = sql; ReturnType = returnType } @@ -440,12 +457,12 @@ type internal QuotationsFactory () = | _ -> QuotationsFactory.DataColumnArrayEmptyExpr)) - static member AddTopLevelTypes (cmdProvidedType: ProvidedTypeDefinition) parameters resultType (methodTypes: MethodTypes) customTypes statements typeToAttachTo = + static member AddTopLevelTypes (cmdProvidedType: ProvidedTypeDefinition) parameters resultType (methodTypes: MethodTypes) customTypes statements typeToAttachTo asyncChoice = let executeArgs = QuotationsFactory.GetExecuteArgs (parameters, customTypes) let addRedirectToISqlCommandMethods outputType xmlDoc = let add outputType name xmlDoc = - let m = QuotationsFactory.AddGeneratedMethod (parameters, executeArgs, cmdProvidedType.BaseType, outputType, name) + let m = QuotationsFactory.AddGeneratedMethod (parameters, executeArgs, cmdProvidedType.BaseType, outputType, asyncChoice, name) Option.iter m.AddXmlDoc xmlDoc cmdProvidedType.AddMember m @@ -455,6 +472,18 @@ type internal QuotationsFactory () = add (typedefof>.MakeGenericType outputType) "AsyncExecute" xmlDoc if methodTypes.HasFlag MethodTypes.Task then add (typedefof>.MakeGenericType outputType) "TaskAsyncExecute" xmlDoc + + let evt = + let evtName = "RetryEvent" + let evtType = typeof> + let erasedType = cmdProvidedType.BaseType + ProvidedEvent ( + evtName, + evtType, + (fun args -> Expr.Call (Expr.Coerce (args.[0], erasedType), typeof.GetMethod ("add_" + evtName), [Expr.Coerce (args.[1], evtType)])), + (fun args -> Expr.Call (Expr.Coerce (args.[0], erasedType), typeof.GetMethod ("remove_" + evtName), [Expr.Coerce (args.[1], evtType)])), + false) + cmdProvidedType.AddMember evt match statements with | _ when resultType = ResultType.DataReader -> diff --git a/src/Runtime/ISqlCommand.fs b/src/Runtime/ISqlCommand.fs index 277c7cc..58ca643 100644 --- a/src/Runtime/ISqlCommand.fs +++ b/src/Runtime/ISqlCommand.fs @@ -16,6 +16,7 @@ type internal ExecutionType = [] type ISqlCommand = + [] abstract RetryEvent: IEvent abstract Execute: parameters: (string * obj)[] -> obj abstract AsyncExecute: parameters: (string * obj)[] -> obj abstract TaskAsyncExecute: parameters: (string * obj)[] -> obj @@ -29,10 +30,12 @@ type DesignTimeConfig = { SingleRow: bool ResultSets: ResultSetDefinition[] Prepare: bool - CommandTimeout : int + CommandTimeout: int + Tries: int + RetryWaitTime: int } with - static member Create (sql, ps, resultType, collection, singleRow, (columns: DataColumn[][]), prepare, commandTimeout) = { + static member Create (sql, ps, resultType, collection, singleRow, (columns: DataColumn[][]), prepare, commandTimeout, tries, retryWaitTime) = { SqlStatement = sql Parameters = ps ResultType = resultType @@ -40,7 +43,9 @@ type DesignTimeConfig = { SingleRow = singleRow ResultSets = columns |> Array.map (fun r -> CreateResultSetDefinition (r, resultType)) Prepare = prepare - CommandTimeout = commandTimeout } + CommandTimeout = commandTimeout + Tries = tries + RetryWaitTime = retryWaitTime } [] type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> DesignTimeConfig, connection, commandTimeout) = @@ -103,22 +108,18 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design p.Clone () |> cmd.Parameters.Add |> ignore cmd + let retryEvent = + Event () + + let mutable retryCallback = + fun (_ : Exception) -> () + static let getReaderBehavior (connection, cfg) = // Don't pass CommandBehavior.SingleRow to Npgsql, because it only applies to the first row of the first result set and all other result sets are completely ignored if cfg.SingleRow && cfg.ResultSets.Length = 1 then CommandBehavior.SingleRow else CommandBehavior.Default ||| if cfg.ResultType = ResultType.DataTable then CommandBehavior.KeyInfo else CommandBehavior.Default ||| match connection with Choice1Of2 _ -> CommandBehavior.CloseConnection | _ -> CommandBehavior.Default - static let setupConnection (cmd: NpgsqlCommand, connection) = - match connection with - | Choice2Of2 (conn, tx) -> - cmd.Connection <- conn - cmd.Transaction <- tx - System.Threading.Tasks.Task.CompletedTask - | Choice1Of2 connectionString -> - cmd.Connection <- new NpgsqlConnection (connectionString) - cmd.Connection.OpenAsync () - static let mapTask (t: Ply.Ply<_>, executionType) = let t = task { return! t } @@ -128,9 +129,10 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design | TaskAsync -> box t interface ISqlCommand with - member _.Execute parameters = execute (cfg, cmd, connection, parameters, Sync) - member _.AsyncExecute parameters = execute (cfg, cmd, connection, parameters, Async) - member _.TaskAsyncExecute parameters = execute (cfg, cmd, connection, parameters, TaskAsync) + [] member _.RetryEvent = retryEvent.Publish + member _.Execute parameters = execute (retryEvent, cfg, cmd, connection, parameters, Sync) + member _.AsyncExecute parameters = execute (retryEvent, cfg, cmd, connection, parameters, Async) + member _.TaskAsyncExecute parameters = execute (retryEvent, cfg, cmd, connection, parameters, TaskAsync) interface IDisposable with member _.Dispose () = @@ -168,32 +170,33 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design cursor.Close() invalidOp message - static member internal AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters) = Unsafe.uply { + static member internal AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters) = Unsafe.uply { ISqlCommandImplementation.SetParameters (cmd, parameters) - do! setupConnection (cmd, connection) + do! Utils.SetupConnectionAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, cmd, connection) let readerBehavior = getReaderBehavior (connection, cfg) if cfg.Prepare then - do! cmd.PrepareAsync () + do! Utils.PrepareAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, cmd) - let! cursor = cmd.ExecuteReaderAsync readerBehavior + let! cursor = Utils.ExecuteReaderAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, readerBehavior, cmd) return cursor :?> NpgsqlDataReader } - static member internal AsyncExecuteReader (cfg, cmd, connection, parameters, executionType) = - mapTask (ISqlCommandImplementation.AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters), executionType) + static member internal AsyncExecuteReader (retryEvent, cfg, cmd, connection, parameters, executionType) = + mapTask (ISqlCommandImplementation.AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters), executionType) + + static member internal LoadDataTable retryEvent cfg (cursor: Common.DbDataReader) (cmd: NpgsqlCommand) (columns: DataColumn[]) = - static member internal LoadDataTable (cursor: Common.DbDataReader) cmd (columns: DataColumn[]) = let result = new FSharp.Data.Npgsql.DataTable(selectCommand = cmd) for c in columns do CloneDataColumn c |> result.Columns.Add - result.Load cursor + Utils.LoadDataTable (cfg.Tries, cfg.RetryWaitTime, retryEvent, cursor, cmd, result) result - static member internal AsyncExecuteDataTables (cfg, cmd, connection, parameters, executionType) = + static member internal AsyncExecuteDataTables (retryEvent, cfg, cmd, connection, parameters, executionType) = let t = Unsafe.uply { - use! cursor = ISqlCommandImplementation.AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters) + use! cursor = ISqlCommandImplementation.AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters) // No explicit NextResult calls, Load takes care of it let results = @@ -203,23 +206,23 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design null else ISqlCommandImplementation.VerifyOutputColumns(cursor, resultSet.ExpectedColumns) - ISqlCommandImplementation.LoadDataTable cursor (cmd.Clone()) resultSet.ExpectedColumns |> box) + ISqlCommandImplementation.LoadDataTable retryEvent cfg cursor (cmd.Clone()) resultSet.ExpectedColumns |> box) ISqlCommandImplementation.SetNumberOfAffectedRows (results, cmd.Statements) return results } mapTask (t, executionType) - static member internal AsyncExecuteDataTable (cfg, cmd, connection, parameters, executionType) = + static member internal AsyncExecuteDataTable (retryEvent, cfg, cmd, connection, parameters, executionType) = let t = Unsafe.uply { - use! reader = ISqlCommandImplementation.AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters) - return ISqlCommandImplementation.LoadDataTable reader (cmd.Clone()) cfg.ResultSets.[0].ExpectedColumns } + use! reader = ISqlCommandImplementation.AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters) + return ISqlCommandImplementation.LoadDataTable retryEvent cfg reader (cmd.Clone()) cfg.ResultSets.[0].ExpectedColumns } mapTask (t, executionType) // TODO output params - static member internal ExecuteSingle<'TItem> () = Func<_, _, _, _>(fun reader resultSetDefinition cfg -> Unsafe.uply { - let! xs = MapRowValues<'TItem> (reader, cfg.ResultType, resultSetDefinition) + static member internal ExecuteSingle<'TItem> () = Func<_, _, _, _, _>(fun reader resultSetDefinition retryEvent cfg -> Unsafe.uply { + let! xs = MapRowValues<'TItem> (cfg.Tries, cfg.RetryWaitTime, retryEvent, reader, cfg.ResultType, resultSetDefinition) return if cfg.SingleRow then @@ -231,24 +234,24 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design else box xs }) - static member internal AsyncExecuteList<'TItem> () = fun (cfg, cmd, connection, parameters, executionType) -> + static member internal AsyncExecuteList<'TItem> () = fun (retryEvent, cfg, cmd, connection, parameters, executionType) -> if cfg.CollectionType = CollectionType.LazySeq && not cfg.SingleRow then let t = Unsafe.uply { - let! reader = ISqlCommandImplementation.AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters) + let! reader = ISqlCommandImplementation.AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters) let xs = if cfg.ResultSets.[0].ExpectedColumns.Length > 1 then - MapRowValuesOntoTupleLazy<'TItem> (reader, cfg.ResultType, cfg.ResultSets.[0]) + MapRowValuesOntoTupleLazy<'TItem> (cfg.Tries, cfg.RetryWaitTime, retryEvent, reader, cfg.ResultType, cfg.ResultSets.[0]) else - MapRowValuesLazy<'TItem> (reader, cfg.ResultSets.[0]) + MapRowValuesLazy<'TItem> (cfg.Tries, cfg.RetryWaitTime, retryEvent, reader, cfg.ResultSets.[0]) return new LazySeq<'TItem> (xs, reader, cmd) } mapTask (t, executionType) else let xs = Unsafe.uply { - use! reader = ISqlCommandImplementation.AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters) - return! MapRowValues<'TItem> (reader, cfg.ResultType, cfg.ResultSets.[0]) } + use! reader = ISqlCommandImplementation.AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters) + return! MapRowValues<'TItem> (cfg.Tries, cfg.RetryWaitTime, retryEvent, reader, cfg.ResultType, cfg.ResultSets.[0]) } if cfg.SingleRow then let t = Unsafe.uply { @@ -271,7 +274,7 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design else mapTask (xs, executionType) - static member private ReadResultSet (cursor: Common.DbDataReader, resultSetDefinition, cfg) = + static member private ReadResultSet (cursor: Common.DbDataReader, resultSetDefinition, retryEvent, cfg) = ISqlCommandImplementation.VerifyOutputColumns(cursor, resultSetDefinition.ExpectedColumns) let func = @@ -283,16 +286,16 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design typeof .GetMethod(nameof ISqlCommandImplementation.ExecuteSingle, BindingFlags.NonPublic ||| BindingFlags.Static) .MakeGenericMethod(resultSetDefinition.ErasedRowType) - .Invoke(null, [||]) :?> Func<_, _, _, Ply.Ply> + .Invoke(null, [||]) :?> Func<_, _, _, _, Ply.Ply> executeSingleCache.[resultSetDefinition.ErasedRowType] <- func func - func.Invoke (cursor, resultSetDefinition, cfg) + func.Invoke (cursor, resultSetDefinition, retryEvent, cfg) - static member internal AsyncExecuteMulti (cfg, cmd, connection, parameters, executionType) = + static member internal AsyncExecuteMulti (retryEvent, cfg, cmd, connection, parameters, executionType) = let t = Unsafe.uply { - use! cursor = ISqlCommandImplementation.AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters) + use! cursor = ISqlCommandImplementation.AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters) let results = Array.zeroCreate cmd.Statements.Count // Command contains at least one query @@ -301,9 +304,9 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design while go do let currentStatement = GetStatementIndex.Invoke cursor - let! res = ISqlCommandImplementation.ReadResultSet (cursor, cfg.ResultSets.[currentStatement], cfg) + let! res = ISqlCommandImplementation.ReadResultSet (cursor, cfg.ResultSets.[currentStatement], retryEvent, cfg) results.[currentStatement] <- res - let! more = cursor.NextResultAsync () + let! more = Utils.NextResultAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, cursor) go <- more ISqlCommandImplementation.SetNumberOfAffectedRows (results, cmd.Statements) @@ -311,17 +314,17 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design mapTask (t, executionType) - static member internal AsyncExecuteNonQuery (cfg, cmd, connection, parameters, executionType) = + static member internal AsyncExecuteNonQuery (retryEvent, cfg, cmd, connection, parameters, executionType) = let t = Unsafe.uply { ISqlCommandImplementation.SetParameters (cmd, parameters) - do! setupConnection (cmd, connection) + do! Utils.SetupConnectionAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, cmd, connection) let readerBehavior = getReaderBehavior (connection, cfg) use _ = if readerBehavior.HasFlag CommandBehavior.CloseConnection then cmd.Connection else null if cfg.Prepare then - do! cmd.PrepareAsync () + do! Utils.PrepareAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, cmd) - return! cmd.ExecuteNonQueryAsync () } + return! Utils.ExecuteNonQueryAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, cmd) } mapTask (t, executionType) diff --git a/src/Runtime/Runtime.fsproj b/src/Runtime/Runtime.fsproj index a108e4c..4b5b2af 100644 --- a/src/Runtime/Runtime.fsproj +++ b/src/Runtime/Runtime.fsproj @@ -9,7 +9,7 @@ true true true - true + false 101 preview diff --git a/src/Runtime/Utils.fs b/src/Runtime/Utils.fs index d342857..ca4c6fe 100644 --- a/src/Runtime/Utils.fs +++ b/src/Runtime/Utils.fs @@ -5,15 +5,156 @@ open System.Data open System.Data.Common open System.Collections.Concurrent open System.ComponentModel +open System.Linq.Expressions open Npgsql open NpgsqlTypes open FSharp.Control.Tasks.NonAffine -open System.Linq.Expressions #nowarn "0025" +[] +module internal LocalExtensions = + + type String with + + member this.ErrorClass = + if this.Length >= 2 + then this.Substring 2 + else raise (InvalidOperationException ()) + +[] +module internal Retry = + + let ShouldRetry (triesCurrent, triesMax) = + triesMax <= 0 || triesCurrent < triesMax + + let ShouldRetryWithConnection (triesCurrent, triesMax, connection: NpgsqlConnection) = + ShouldRetry (triesCurrent, triesMax) && + (connection.State &&& ConnectionState.Open = ConnectionState.Open) + + let rec ShouldRetryException (exn: Exception) = + match exn with + | :? PostgresException as pgexn -> + let sqlState = pgexn.SqlState + let errorClass = sqlState.ErrorClass + if sqlState = PostgresErrorCodes.IoError || + sqlState = PostgresErrorCodes.DeadlockDetected || + sqlState = PostgresErrorCodes.LockNotAvailable || + sqlState = PostgresErrorCodes.TransactionIntegrityConstraintViolation || + sqlState = PostgresErrorCodes.InFailedSqlTransaction || + sqlState = PostgresErrorCodes.TooManyConnections || + errorClass = PostgresErrorCodes.ConnectionException.ErrorClass || + errorClass = PostgresErrorCodes.InsufficientResources.ErrorClass then + true + else false + | :? NpgsqlException -> + true + | :? AggregateException as aggexn -> + Seq.forall ShouldRetryException aggexn.InnerExceptions + | _ -> false + +[] +module internal Async = + + let CatchDb a = + async { + try + let! result = a + return Choice1Of2 result + with exn when Retry.ShouldRetryException exn -> + return Choice2Of2 exn } + [] type Utils () = + + static let rec LoadDataTable' (triesCurrent, exns, triesMax, retryWaitTime: int, retryEvent: Event, cursor, cmd: NpgsqlCommand, result: DataRow DataTable) = + try result.Load cursor + with exn when Retry.ShouldRetryException exn -> + if Retry.ShouldRetryWithConnection (triesCurrent, triesMax, cmd.Connection) then + // NOTE: doing a Thread.Sleep here doesn't help. + // I am not convinced this code is meant to be run parallel. + retryEvent.Trigger exn + LoadDataTable' (triesCurrent + 1, exn :: exns, triesMax, retryWaitTime, retryEvent, cursor, cmd, result) + else raise (AggregateException (Seq.rev exns)) + + static let rec SetupConnectionAsync' (triesCurrent, exns, triesMax, retryWaitTime, retryEvent: Event, cmd: NpgsqlCommand, connection) = + async { + match connection with + | Choice1Of2 connectionString -> + cmd.Connection <- new NpgsqlConnection (connectionString) + let! choice = cmd.Connection.OpenAsync () |> Async.AwaitTask |> Async.CatchDb + match choice with + | Choice1Of2 () -> () + | Choice2Of2 exn -> + if Retry.ShouldRetry (triesCurrent, triesMax) then + do! Async.Sleep retryWaitTime + retryEvent.Trigger exn + do! SetupConnectionAsync' (triesCurrent+1, exn :: exns, triesMax, retryWaitTime, retryEvent, cmd, connection) + else return raise (AggregateException (Seq.rev exns)) + | Choice2Of2 (conn, tx) -> + cmd.Connection <- conn + cmd.Transaction <- tx } + + static let rec ReadAsync' (triesCurrent, exns, triesMax, retryWaitTime, retryEvent: Event, cursor: DbDataReader) = + async { + let! choice = cursor.ReadAsync () |> Async.AwaitTask |> Async.CatchDb + match choice with + | Choice1Of2 go -> return go + | Choice2Of2 exn -> + if Retry.ShouldRetry (triesCurrent, triesMax) then + do! Async.Sleep retryWaitTime + retryEvent.Trigger exn + return! ReadAsync' (triesCurrent+1, exn :: exns, triesMax, retryWaitTime, retryEvent, cursor) + else return raise (AggregateException (Seq.rev exns)) } + + static let rec NextResultAsync' (triesCurrent, exns, triesMax, retryWaitTime, retryEvent: Event, cursor: DbDataReader) = + async { + let! choice = cursor.NextResultAsync () |> Async.AwaitTask |> Async.CatchDb + match choice with + | Choice1Of2 go -> return go + | Choice2Of2 exn -> + if Retry.ShouldRetry (triesCurrent, triesMax) then + do! Async.Sleep retryWaitTime + retryEvent.Trigger exn + return! NextResultAsync' (triesCurrent+1, exn :: exns, triesMax, retryWaitTime, retryEvent, cursor) + else return raise (AggregateException (Seq.rev exns)) } + + static let rec PrepareAsync' (triesCurrent, exns, triesMax, retryWaitTime, retryEvent: Event, cmd: NpgsqlCommand) = + async { + let! choice = cmd.PrepareAsync () |> Async.AwaitTask |> Async.CatchDb + match choice with + | Choice1Of2 () -> return () + | Choice2Of2 exn -> + if Retry.ShouldRetryWithConnection (triesCurrent, triesMax, cmd.Connection) then + do! Async.Sleep retryWaitTime + retryEvent.Trigger exn + return! PrepareAsync' (triesCurrent+1, exn :: exns, triesMax, retryWaitTime, retryEvent, cmd) + else return raise (AggregateException (Seq.rev exns)) } + + static let rec ExecuteReaderAsync' (triesCurrent, exns, triesMax, retryWaitTime, retryEvent: Event, behavior: CommandBehavior, cmd: NpgsqlCommand) = + async { + let! choice = cmd.ExecuteReaderAsync behavior |> Async.AwaitTask |> Async.CatchDb + match choice with + | Choice1Of2 reader -> return reader + | Choice2Of2 exn -> + if Retry.ShouldRetryWithConnection (triesCurrent, triesMax, cmd.Connection) then + do! Async.Sleep retryWaitTime + retryEvent.Trigger exn + return! ExecuteReaderAsync' (triesCurrent+1, exn :: exns, triesMax, retryWaitTime, retryEvent, behavior, cmd) + else return raise (AggregateException (Seq.rev exns)) } + + static let rec ExecuteNonQueryAsync' (triesCurrent, exns, triesMax, retryWaitTime, retryEvent: Event, cmd: NpgsqlCommand) = + async { + let! choice = cmd.ExecuteNonQueryAsync () |> Async.AwaitTask |> Async.CatchDb + match choice with + | Choice1Of2 rowsAffected -> return rowsAffected + | Choice2Of2 exn -> + if Retry.ShouldRetryWithConnection (triesCurrent, triesMax, cmd.Connection) then + do! Async.Sleep retryWaitTime + retryEvent.Trigger exn + return! ExecuteNonQueryAsync' (triesCurrent+1, exn :: exns, triesMax, retryWaitTime, retryEvent, cmd) + else return raise (AggregateException (Seq.rev exns)) } + static let getColumnMapping = let cache = ConcurrentDictionary obj> () let factory = Func<_, _>(fun (typeParam: Type) -> @@ -78,6 +219,33 @@ type Utils () = cache.[resultSet.ExpectedColumns.GetHashCode ()] <- func func + static member LoadDataTable (tries, retryWaitTime, retryEvent, cursor, cmd, result) = + LoadDataTable' (0, [], tries, retryWaitTime, retryEvent, cursor, cmd, result) + + static member SetupConnectionAsync (tries, retryWaitTime, retryEvent, cmd, connection) = + async { + do! SetupConnectionAsync' (0, [], tries, retryWaitTime, retryEvent, cmd, connection) } + + static member ReadAsync (tries, retryWaitTime, retryEvent, cursor) = + async { + return! ReadAsync' (0, [], tries, retryWaitTime, retryEvent, cursor) } + + static member NextResultAsync (tries, retryWaitTime, retryEvent, cursor) = + async { + return! NextResultAsync' (0, [], tries, retryWaitTime, retryEvent, cursor) } + + static member PrepareAsync (tries, retryWaitTime, retryEvent, cmd) = + async { + return! PrepareAsync' (0, [], tries, retryWaitTime, retryEvent, cmd) } + + static member ExecuteReaderAsync (tries, retryWaitTime, retryEvent, behavior, cmd) = + async { + return! ExecuteReaderAsync' (0, [], tries, retryWaitTime, retryEvent, behavior, cmd) } + + static member ExecuteNonQueryAsync (tries, retryWaitTime, retryEvent, cmd) = + async { + return! ExecuteNonQueryAsync' (0, [], tries, retryWaitTime, retryEvent, cmd) } + static member ResizeArrayToList ra = let rec inner (ra: ResizeArray<'a>, index, acc) = if index = 0 then @@ -186,11 +354,11 @@ type Utils () = let [| columnName; typeName; nullable |] = stringValues.Split '|' new DataColumn (columnName, Utils.GetType typeName, AllowDBNull = (nullable = "1")) - static member MapRowValuesOntoTuple<'TItem> (cursor: DbDataReader, resultType, resultSet) = Unsafe.uply { + static member MapRowValuesOntoTuple<'TItem> (tries, retryWaitTime, retryEvent, cursor: DbDataReader, resultType, resultSet) = Unsafe.uply { let results = ResizeArray<'TItem> () let rowReader = getRowToTupleReader resultSet (resultType = ResultType.Records) - let! go = cursor.ReadAsync () + let! go = Utils.ReadAsync (tries, retryWaitTime, retryEvent, cursor) let mutable go = go while go do @@ -198,27 +366,27 @@ type Utils () = |> unbox |> results.Add - let! cont = cursor.ReadAsync () + let! cont = Utils.ReadAsync (tries, retryWaitTime, retryEvent, cursor) go <- cont return results } - static member MapRowValuesOntoTupleLazy<'TItem> (cursor: DbDataReader, resultType, resultSet) = + static member MapRowValuesOntoTupleLazy<'TItem> (tries, retryWaitTime, retryEvent, cursor: DbDataReader, resultType, resultSet) = seq { let rowReader = getRowToTupleReader resultSet (resultType = ResultType.Records) - while cursor.Read () do + while Utils.ReadAsync (tries, retryWaitTime, retryEvent, cursor) |> Async.RunSynchronously do rowReader.Invoke cursor |> unbox<'TItem> } - static member MapRowValues<'TItem> (cursor: DbDataReader, resultType, resultSet: ResultSetDefinition) = + static member MapRowValues<'TItem> (tries, retryWaitTime, retryEvent, cursor: DbDataReader, resultType, resultSet: ResultSetDefinition) = if resultSet.ExpectedColumns.Length > 1 then - Utils.MapRowValuesOntoTuple<'TItem> (cursor, resultType, resultSet) + Utils.MapRowValuesOntoTuple<'TItem> (tries, retryWaitTime, retryEvent, cursor, resultType, resultSet) else Unsafe.uply { let columnMapping = getColumnMapping resultSet.ExpectedColumns.[0] let results = ResizeArray<'TItem> () - let! go = cursor.ReadAsync () + let! go = Utils.ReadAsync (tries, retryWaitTime, retryEvent, cursor) let mutable go = go while go do @@ -227,16 +395,16 @@ type Utils () = |> unbox |> results.Add - let! cont = cursor.ReadAsync () + let! cont = Utils.ReadAsync (tries, retryWaitTime, retryEvent, cursor) go <- cont return results } - static member MapRowValuesLazy<'TItem> (cursor: DbDataReader, resultSet) = + static member MapRowValuesLazy<'TItem> (tries, retryWaitTime, retryEvent, cursor: DbDataReader, resultSet) = seq { let columnMapping = getColumnMapping resultSet.ExpectedColumns.[0] - while cursor.Read () do + while Utils.ReadAsync (tries, retryWaitTime, retryEvent, cursor) |> Async.RunSynchronously do cursor.GetValue 0 |> columnMapping |> unbox<'TItem> diff --git a/tests/NpgsqlConnectionTests.fs b/tests/NpgsqlConnectionTests.fs index 31db387..0482c87 100644 --- a/tests/NpgsqlConnectionTests.fs +++ b/tests/NpgsqlConnectionTests.fs @@ -2,8 +2,9 @@ module NpgsqlConnectionTests open System open Xunit -open FSharp.Data.Npgsql open System.Reflection +open System.Threading.Tasks +open FSharp.Data.Npgsql open type Npgsql.NpgsqlNetTopologySuiteExtensions open NetTopologySuite.Geometries @@ -151,6 +152,89 @@ let paramInLimit() = [] let getRentalById = "SELECT return_date FROM rental WHERE rental_id = @id" +[] +let retry () = + let chc = + seq { + for _ in 1 .. 10 do + let connectionStrWithIncorrectPort = "Host=localhost;Username=postgres;Password=postgres;Database=dvdrental;Port=1313" + let cmd = DvdRental.CreateCommand<"SELECT * FROM rental", ResultType.DataTable, Tries = 5> connectionStrWithIncorrectPort + yield async { + let! result = cmd.AsyncExecute () + (cmd :> IDisposable).Dispose () + return result }} + |> Async.Parallel + |> Async.Catch + |> Async.RunSynchronously + let isExpectedAggregateException = + match chc with + | Choice2Of2 exn -> + match exn with + | :? AggregateException as aggexn -> + match aggexn.InnerException with + | :? AggregateException as aggexn2 -> + match aggexn2.InnerException with + | :? AggregateException as aggexn3 -> + match aggexn3.InnerException with + | :? Npgsql.NpgsqlException -> true + | _ -> false + | _ -> false + | _ -> false + | _ -> false + | Choice1Of2 _ -> false + Assert.True isExpectedAggregateException + +exception ContrivedRetryException of unit + +[] +let retryEvent () = + let chc = + seq { + for _ in 1 .. 10 do + let connectionStrWithIncorrectPort = "Host=localhost;Username=postgres;Password=postgres;Database=dvdrental;Port=1313" + let cmd = DvdRental.CreateCommand<"SELECT * FROM rental", ResultType.DataTable, Tries = 5> connectionStrWithIncorrectPort + cmd.add_RetryEvent (fun _ _ -> raise (ContrivedRetryException ())) + yield async { + let! result = cmd.AsyncExecute () + (cmd :> IDisposable).Dispose () + return result }} + |> Async.Parallel + |> Async.Catch + |> Async.RunSynchronously + let isExpectedContrivedException = + match chc with + | Choice2Of2 exn -> + match exn with + | :? AggregateException as aggexn -> + match aggexn.InnerException with + | :? ContrivedRetryException -> true + | _ -> false + | _ -> false + | Choice1Of2 _ -> false + Assert.True isExpectedContrivedException + +type DvdRental' = NpgsqlConnection<"Host=localhost;Username=postgres;Password=postgres;Database=dvdrental;Port=5432", MethodTypes = methodTypes, AsyncChoice = true> + +[] +let retryAsyncChoice () = + let chc = + seq { + for _ in 1 .. 10 do + let connectionStrWithIncorrectPort = "Host=localhost;Username=postgres;Password=postgres;Database=dvdrental;Port=1313" + let cmd = DvdRental'.CreateCommand<"SELECT * FROM rental", ResultType.DataTable, Tries = 5> connectionStrWithIncorrectPort + yield async { + let! result = cmd.AsyncExecute () + (cmd :> IDisposable).Dispose () + return match result with Choice1Of2 r -> r | Choice2Of2 _ -> raise (ContrivedRetryException ()) }} + |> Async.Parallel + |> Async.Catch + |> Async.RunSynchronously + let isExpectedContrivedException = + match chc with + | Choice2Of2 exn -> match exn with :? ContrivedRetryException -> true | _ -> false + | Choice1Of2 _ -> false + Assert.True isExpectedContrivedException + [] let dateTableWithUpdate() =