diff --git a/src/DesignTime/DesignTime.fsproj b/src/DesignTime/DesignTime.fsproj
index 7a21886..0aca126 100644
--- a/src/DesignTime/DesignTime.fsproj
+++ b/src/DesignTime/DesignTime.fsproj
@@ -10,8 +10,7 @@
true
true
true
- true
- 1182
+ false
preview
diff --git a/src/DesignTime/NpgsqlConnectionProvider.fs b/src/DesignTime/NpgsqlConnectionProvider.fs
index 5cead74..1538f8b 100644
--- a/src/DesignTime/NpgsqlConnectionProvider.fs
+++ b/src/DesignTime/NpgsqlConnectionProvider.fs
@@ -11,14 +11,15 @@ open System.Collections.Concurrent
open System.Reflection
let mutable cacheInstanceCount = 0
-let methodsCache = ConcurrentDictionary ()
+let methodsCache = ConcurrentDictionary ()
let typeCache = ConcurrentDictionary ()
let schemaCache = ConcurrentDictionary ()
let addCreateCommandMethod(connectionString, rootType: ProvidedTypeDefinition,
commands: ProvidedTypeDefinition, customTypes: Map,
dbSchemaLookups: DbSchemaLookups, globalXCtor, globalPrepare: bool,
- providedTypeReuse, methodTypes, globalCollectionType: CollectionType, globalCommandTimeout : int) =
+ providedTypeReuse, methodTypes, globalCollectionType: CollectionType, globalCommandTimeout: int,
+ globalTries: int, globalRetryWaitTime: int, globalAsyncChoice: bool) =
let staticParams =
[
@@ -31,21 +32,23 @@ let addCreateCommandMethod(connectionString, rootType: ProvidedTypeDefinition,
if not globalXCtor then yield ProvidedStaticParameter("XCtor", typeof, false)
yield ProvidedStaticParameter("Prepare", typeof, globalPrepare)
yield ProvidedStaticParameter("CommandTimeout", typeof, globalCommandTimeout)
+ yield ProvidedStaticParameter("Tries", typeof, globalTries)
+ yield ProvidedStaticParameter("RetryWaitTime", typeof, globalRetryWaitTime)
]
let m = ProvidedMethod("CreateCommand", [], typeof, isStatic = true)
m.DefineStaticParameters(staticParams, (fun methodName args ->
- let sqlStatement, resultType, collectionType, singleRow, allParametersOptional, typename, xctor, (prepare: bool), (commandTimeout : int) =
+ let sqlStatement, resultType, collectionType, singleRow, allParametersOptional, typename, xctor, (prepare: bool), (commandTimeout: int), (tries: int), (retryWaitTime: int) =
if not globalXCtor then
- args.[0] :?> _ , args.[1] :?> _, args.[2] :?> _, args.[3] :?> _, args.[4] :?> _, args.[5] :?> _, args.[6] :?> _, args.[7] :?> _, args.[8] :?> _
+ args.[0] :?> _ , args.[1] :?> _, args.[2] :?> _, args.[3] :?> _, args.[4] :?> _, args.[5] :?> _, args.[6] :?> _, args.[7] :?> _, args.[8] :?> _, args.[9] :?> _, args.[10] :?> _
else
- args.[0] :?> _ , args.[1] :?> _, args.[2] :?> _, args.[3] :?> _, args.[4] :?> _, args.[5] :?> _, true, args.[6] :?> _, args.[7] :?> _
+ args.[0] :?> _ , args.[1] :?> _, args.[2] :?> _, args.[3] :?> _, args.[4] :?> _, args.[5] :?> _, true, args.[6] :?> _, args.[7] :?> _, args.[8] :?> _, args.[9] :?> _
//let methodName = Regex.Replace(methodName, @"\s+", " ", RegexOptions.Multiline).Replace("\"", "").Replace("@", ":").Replace("CreateCommand,CommandText=", "").Trim()
let commandTypeName = if typename <> "" then typename else methodName
methodsCache.GetOrAdd(
- commandTypeName,
+ (commandTypeName, globalAsyncChoice),
fun _ ->
if singleRow && not (resultType = ResultType.Records || resultType = ResultType.Tuples) then
invalidArg "SingleRow" "SingleRow can be set only for ResultType.Records or ResultType.Tuples."
@@ -66,13 +69,15 @@ let addCreateCommandMethod(connectionString, rootType: ProvidedTypeDefinition,
collectionType,
singleRow,
(if statements.Length > 1 then (i + 1).ToString () else ""),
- providedTypeReuse))
+ providedTypeReuse,
+ globalAsyncChoice))
let cmdProvidedType = ProvidedTypeDefinition (commandTypeName, Some typeof, hideObjectMethods = true)
commands.AddMember cmdProvidedType
QuotationsFactory.AddTopLevelTypes cmdProvidedType parameters resultType methodTypes customTypes statements
(if resultType <> ResultType.Records || providedTypeReuse = NoReuse then cmdProvidedType else rootType)
+ globalAsyncChoice
let designTimeConfig =
Expr.Lambda (Var ("x", typeof),
@@ -85,6 +90,8 @@ let addCreateCommandMethod(connectionString, rootType: ProvidedTypeDefinition,
QuotationsFactory.BuildDataColumnsExpr (statements, resultType <> ResultType.DataTable)
Expr.Value prepare
Expr.Value commandTimeout
+ Expr.Value tries
+ Expr.Value retryWaitTime
]))
let method = QuotationsFactory.GetCommandFactoryMethod (cmdProvidedType, designTimeConfig, xctor, commandTypeName)
@@ -160,7 +167,7 @@ let createTableTypes(customTypes : Map, item: Db
tables
-let createRootType (assembly, nameSpace: string, typeName, connectionString, xctor, prepare, reuseProvidedTypes, methodTypes, collectionType, commandTimeout) =
+let createRootType (assembly, nameSpace: string, typeName, connectionString, xctor, prepare, reuseProvidedTypes, methodTypes, collectionType, commandTimeout, tries, retryWaitTime, asyncChoice) =
if String.IsNullOrWhiteSpace connectionString then invalidArg "Connection" "Value is empty!"
let databaseRootType = ProvidedTypeDefinition (assembly, nameSpace, typeName, baseType = Some typeof, hideObjectMethods = true)
@@ -191,7 +198,7 @@ let createRootType (assembly, nameSpace: string, typeName, connectionString, xct
let commands = ProvidedTypeDefinition("Commands", None)
databaseRootType.AddMember commands
let providedTypeReuse = if reuseProvidedTypes then WithCache typeCache else NoReuse
- addCreateCommandMethod (connectionString, databaseRootType, commands, customTypes, schemaLookups, xctor, prepare, providedTypeReuse, methodTypes, collectionType, commandTimeout)
+ addCreateCommandMethod (connectionString, databaseRootType, commands, customTypes, schemaLookups, xctor, prepare, providedTypeReuse, methodTypes, collectionType, commandTimeout, tries, retryWaitTime, asyncChoice)
databaseRootType
@@ -208,8 +215,11 @@ let internal getProviderType (assembly, nameSpace) =
ProvidedStaticParameter("MethodTypes", typeof, MethodTypes.Sync ||| MethodTypes.Async)
ProvidedStaticParameter("CollectionType", typeof, CollectionType.List)
ProvidedStaticParameter("CommandTimeout", typeof, 0)
+ ProvidedStaticParameter("Tries", typeof, 1)
+ ProvidedStaticParameter("RetryWaitTime", typeof, 1000)
+ ProvidedStaticParameter("AsyncChoice", typeof, false)
],
- fun typeName args -> typeCache.GetOrAdd (typeName, fun typeName -> createRootType (assembly, nameSpace, typeName, unbox args.[0], unbox args.[1], unbox args.[2], unbox args.[3], unbox args.[4], unbox args.[5], unbox args.[6])))
+ fun typeName args -> typeCache.GetOrAdd (typeName, fun typeName -> createRootType (assembly, nameSpace, typeName, unbox args.[0], unbox args.[1], unbox args.[2], unbox args.[3], unbox args.[4], unbox args.[5], unbox args.[6], unbox args.[7], unbox args.[8], unbox args.[9])))
providerType.AddXmlDoc """
Typed access to PostgreSQL programmable objects, tables and functions.
@@ -220,6 +230,9 @@ let internal getProviderType (assembly, nameSpace) =
Indicates whether to generate Execute, AsyncExecute or both methods for commands.
Indicates whether rows should be returned in a list, array or ResizeArray.
The time to wait (in seconds) while trying to execute a command before terminating the attempt and generating an error. Set to zero for infinity.
+The number of attempts alotted for a database operation. Set to 0 for infinity.
+The time to wait (in milliseconds) while waiting to retry a databased operation before terminating the attempt and generating an error. Set to zero for infinity.
+Whether Async functions perform Async.Catch implcity and return Choice<'a, Exception> rather than 'a.
"""
providerType
diff --git a/src/DesignTime/QuotationsFactory.fs b/src/DesignTime/QuotationsFactory.fs
index bb1b791..30851bb 100644
--- a/src/DesignTime/QuotationsFactory.fs
+++ b/src/DesignTime/QuotationsFactory.fs
@@ -5,7 +5,9 @@ open System.Data
open System.Reflection
open FSharp.Quotations
open ProviderImplementation.ProvidedTypes
+open ProviderImplementation.ProvidedTypes.UncheckedQuotations
open Npgsql
+open FSharp
open FSharp.Data.Npgsql
open InformationSchema
open System.Collections.Concurrent
@@ -14,6 +16,7 @@ open System.Threading.Tasks
type internal ReturnType = {
Single: Type
RowProvidedType: Type option
+ AsyncChoice: bool
}
type internal Statement = {
@@ -82,7 +85,8 @@ type internal QuotationsFactory () =
static member GetMapperFromOptionToObj (t: Type, value: Expr) =
Expr.Call (typeof.GetMethod(nameof Utils.OptionToObj).MakeGenericMethod t, [ Expr.Coerce (value, typeof) ])
- static member AddGeneratedMethod (sqlParameters: Parameter list, executeArgs: ProvidedParameter list, erasedType, providedOutputType, name) =
+ static member AddGeneratedMethod (sqlParameters: Parameter list, executeArgs: ProvidedParameter list, erasedType, providedOutputType: Type, asyncChoice, name) =
+
let mappedInputParamValues (exprArgs: Expr list) =
(exprArgs.Tail, sqlParameters)
||> List.map2 (fun expr param ->
@@ -105,9 +109,21 @@ type internal QuotationsFactory () =
let invokeCode exprArgs =
let vals = mappedInputParamValues exprArgs
let paramValues = if vals.IsEmpty then QuotationsFactory.ParamArrayEmptyExpr else Expr.NewArray (typeof, vals)
- Expr.Call (Expr.Coerce (exprArgs.[0], erasedType), typeof.GetMethod name, [ paramValues ])
-
- ProvidedMethod(name, executeArgs, providedOutputType, invokeCode)
+ let callWithoutChoice = Expr.Call (Expr.Coerce (exprArgs.[0], erasedType), typeof.GetMethod name, [ paramValues ])
+ if asyncChoice &&
+ providedOutputType.Name = (async { return () }).GetType().Name then
+ Expr.CallUnchecked (ProvidedTypeBuilder.MakeGenericMethod (typeof.GetMethod "Catch", [providedOutputType.GenericTypeArguments.[0]]), [callWithoutChoice])
+ else callWithoutChoice
+
+ let outputType =
+ if asyncChoice &&
+ providedOutputType.Name = (async { return () }).GetType().Name then
+ let choiceType = ProvidedTypeBuilder.MakeGenericType (typedefof>, [providedOutputType.GenericTypeArguments.[0]; typeof])
+ let asyncType = ProvidedTypeBuilder.MakeGenericType ((async { return () }).GetType().GetGenericTypeDefinition(), [choiceType])
+ asyncType
+ else providedOutputType
+
+ ProvidedMethod(name, executeArgs, outputType, invokeCode)
static member GetRecordType (rootTypeName, columns: Column list, customTypes: Map, typeNameSuffix, providedTypeReuse) =
columns
@@ -321,14 +337,14 @@ type internal QuotationsFactory () =
tableType
- static member GetOutputTypes (rootTypeName, sql, statementType, customTypes: Map, resultType, collectionType, singleRow, typeNameSuffix, providedTypeReuse) =
+ static member GetOutputTypes (rootTypeName, sql, statementType, customTypes: Map, resultType, collectionType, singleRow, typeNameSuffix, providedTypeReuse, asyncChoice) =
let returnType =
match resultType, statementType with
| ResultType.DataReader, _
| _, Control ->
None
| _, NonQuery ->
- Some { Single = typeof; RowProvidedType = None }
+ Some { Single = typeof; RowProvidedType = None; AsyncChoice = asyncChoice }
| ResultType.DataTable, Query columns ->
let dataRowType = QuotationsFactory.GetDataRowType (customTypes, columns)
let dataTableType =
@@ -340,7 +356,7 @@ type internal QuotationsFactory () =
dataTableType.AddMember dataRowType
- Some { Single = dataTableType; RowProvidedType = None }
+ Some { Single = dataTableType; RowProvidedType = None; AsyncChoice = asyncChoice }
| _, Query columns ->
let providedRowType =
if List.length columns = 1 then
@@ -364,7 +380,8 @@ type internal QuotationsFactory () =
ProvidedTypeBuilder.MakeGenericType (typedefof>, [ providedRowType ])
else
ProvidedTypeBuilder.MakeGenericType (typedefof<_ list>, [ providedRowType ])
- RowProvidedType = Some providedRowType }
+ RowProvidedType = Some providedRowType
+ AsyncChoice = asyncChoice }
{ Type = statementType; Sql = sql; ReturnType = returnType }
@@ -440,12 +457,12 @@ type internal QuotationsFactory () =
| _ ->
QuotationsFactory.DataColumnArrayEmptyExpr))
- static member AddTopLevelTypes (cmdProvidedType: ProvidedTypeDefinition) parameters resultType (methodTypes: MethodTypes) customTypes statements typeToAttachTo =
+ static member AddTopLevelTypes (cmdProvidedType: ProvidedTypeDefinition) parameters resultType (methodTypes: MethodTypes) customTypes statements typeToAttachTo asyncChoice =
let executeArgs = QuotationsFactory.GetExecuteArgs (parameters, customTypes)
let addRedirectToISqlCommandMethods outputType xmlDoc =
let add outputType name xmlDoc =
- let m = QuotationsFactory.AddGeneratedMethod (parameters, executeArgs, cmdProvidedType.BaseType, outputType, name)
+ let m = QuotationsFactory.AddGeneratedMethod (parameters, executeArgs, cmdProvidedType.BaseType, outputType, asyncChoice, name)
Option.iter m.AddXmlDoc xmlDoc
cmdProvidedType.AddMember m
@@ -455,6 +472,18 @@ type internal QuotationsFactory () =
add (typedefof>.MakeGenericType outputType) "AsyncExecute" xmlDoc
if methodTypes.HasFlag MethodTypes.Task then
add (typedefof>.MakeGenericType outputType) "TaskAsyncExecute" xmlDoc
+
+ let evt =
+ let evtName = "RetryEvent"
+ let evtType = typeof>
+ let erasedType = cmdProvidedType.BaseType
+ ProvidedEvent (
+ evtName,
+ evtType,
+ (fun args -> Expr.Call (Expr.Coerce (args.[0], erasedType), typeof.GetMethod ("add_" + evtName), [Expr.Coerce (args.[1], evtType)])),
+ (fun args -> Expr.Call (Expr.Coerce (args.[0], erasedType), typeof.GetMethod ("remove_" + evtName), [Expr.Coerce (args.[1], evtType)])),
+ false)
+ cmdProvidedType.AddMember evt
match statements with
| _ when resultType = ResultType.DataReader ->
diff --git a/src/Runtime/ISqlCommand.fs b/src/Runtime/ISqlCommand.fs
index 277c7cc..58ca643 100644
--- a/src/Runtime/ISqlCommand.fs
+++ b/src/Runtime/ISqlCommand.fs
@@ -16,6 +16,7 @@ type internal ExecutionType =
[]
type ISqlCommand =
+ [] abstract RetryEvent: IEvent
abstract Execute: parameters: (string * obj)[] -> obj
abstract AsyncExecute: parameters: (string * obj)[] -> obj
abstract TaskAsyncExecute: parameters: (string * obj)[] -> obj
@@ -29,10 +30,12 @@ type DesignTimeConfig = {
SingleRow: bool
ResultSets: ResultSetDefinition[]
Prepare: bool
- CommandTimeout : int
+ CommandTimeout: int
+ Tries: int
+ RetryWaitTime: int
}
with
- static member Create (sql, ps, resultType, collection, singleRow, (columns: DataColumn[][]), prepare, commandTimeout) = {
+ static member Create (sql, ps, resultType, collection, singleRow, (columns: DataColumn[][]), prepare, commandTimeout, tries, retryWaitTime) = {
SqlStatement = sql
Parameters = ps
ResultType = resultType
@@ -40,7 +43,9 @@ type DesignTimeConfig = {
SingleRow = singleRow
ResultSets = columns |> Array.map (fun r -> CreateResultSetDefinition (r, resultType))
Prepare = prepare
- CommandTimeout = commandTimeout }
+ CommandTimeout = commandTimeout
+ Tries = tries
+ RetryWaitTime = retryWaitTime }
[]
type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> DesignTimeConfig, connection, commandTimeout) =
@@ -103,22 +108,18 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design
p.Clone () |> cmd.Parameters.Add |> ignore
cmd
+ let retryEvent =
+ Event ()
+
+ let mutable retryCallback =
+ fun (_ : Exception) -> ()
+
static let getReaderBehavior (connection, cfg) =
// Don't pass CommandBehavior.SingleRow to Npgsql, because it only applies to the first row of the first result set and all other result sets are completely ignored
if cfg.SingleRow && cfg.ResultSets.Length = 1 then CommandBehavior.SingleRow else CommandBehavior.Default
||| if cfg.ResultType = ResultType.DataTable then CommandBehavior.KeyInfo else CommandBehavior.Default
||| match connection with Choice1Of2 _ -> CommandBehavior.CloseConnection | _ -> CommandBehavior.Default
- static let setupConnection (cmd: NpgsqlCommand, connection) =
- match connection with
- | Choice2Of2 (conn, tx) ->
- cmd.Connection <- conn
- cmd.Transaction <- tx
- System.Threading.Tasks.Task.CompletedTask
- | Choice1Of2 connectionString ->
- cmd.Connection <- new NpgsqlConnection (connectionString)
- cmd.Connection.OpenAsync ()
-
static let mapTask (t: Ply.Ply<_>, executionType) =
let t = task { return! t }
@@ -128,9 +129,10 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design
| TaskAsync -> box t
interface ISqlCommand with
- member _.Execute parameters = execute (cfg, cmd, connection, parameters, Sync)
- member _.AsyncExecute parameters = execute (cfg, cmd, connection, parameters, Async)
- member _.TaskAsyncExecute parameters = execute (cfg, cmd, connection, parameters, TaskAsync)
+ [] member _.RetryEvent = retryEvent.Publish
+ member _.Execute parameters = execute (retryEvent, cfg, cmd, connection, parameters, Sync)
+ member _.AsyncExecute parameters = execute (retryEvent, cfg, cmd, connection, parameters, Async)
+ member _.TaskAsyncExecute parameters = execute (retryEvent, cfg, cmd, connection, parameters, TaskAsync)
interface IDisposable with
member _.Dispose () =
@@ -168,32 +170,33 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design
cursor.Close()
invalidOp message
- static member internal AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters) = Unsafe.uply {
+ static member internal AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters) = Unsafe.uply {
ISqlCommandImplementation.SetParameters (cmd, parameters)
- do! setupConnection (cmd, connection)
+ do! Utils.SetupConnectionAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, cmd, connection)
let readerBehavior = getReaderBehavior (connection, cfg)
if cfg.Prepare then
- do! cmd.PrepareAsync ()
+ do! Utils.PrepareAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, cmd)
- let! cursor = cmd.ExecuteReaderAsync readerBehavior
+ let! cursor = Utils.ExecuteReaderAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, readerBehavior, cmd)
return cursor :?> NpgsqlDataReader }
- static member internal AsyncExecuteReader (cfg, cmd, connection, parameters, executionType) =
- mapTask (ISqlCommandImplementation.AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters), executionType)
+ static member internal AsyncExecuteReader (retryEvent, cfg, cmd, connection, parameters, executionType) =
+ mapTask (ISqlCommandImplementation.AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters), executionType)
+
+ static member internal LoadDataTable retryEvent cfg (cursor: Common.DbDataReader) (cmd: NpgsqlCommand) (columns: DataColumn[]) =
- static member internal LoadDataTable (cursor: Common.DbDataReader) cmd (columns: DataColumn[]) =
let result = new FSharp.Data.Npgsql.DataTable(selectCommand = cmd)
for c in columns do
CloneDataColumn c |> result.Columns.Add
- result.Load cursor
+ Utils.LoadDataTable (cfg.Tries, cfg.RetryWaitTime, retryEvent, cursor, cmd, result)
result
- static member internal AsyncExecuteDataTables (cfg, cmd, connection, parameters, executionType) =
+ static member internal AsyncExecuteDataTables (retryEvent, cfg, cmd, connection, parameters, executionType) =
let t = Unsafe.uply {
- use! cursor = ISqlCommandImplementation.AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters)
+ use! cursor = ISqlCommandImplementation.AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters)
// No explicit NextResult calls, Load takes care of it
let results =
@@ -203,23 +206,23 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design
null
else
ISqlCommandImplementation.VerifyOutputColumns(cursor, resultSet.ExpectedColumns)
- ISqlCommandImplementation.LoadDataTable cursor (cmd.Clone()) resultSet.ExpectedColumns |> box)
+ ISqlCommandImplementation.LoadDataTable retryEvent cfg cursor (cmd.Clone()) resultSet.ExpectedColumns |> box)
ISqlCommandImplementation.SetNumberOfAffectedRows (results, cmd.Statements)
return results }
mapTask (t, executionType)
- static member internal AsyncExecuteDataTable (cfg, cmd, connection, parameters, executionType) =
+ static member internal AsyncExecuteDataTable (retryEvent, cfg, cmd, connection, parameters, executionType) =
let t = Unsafe.uply {
- use! reader = ISqlCommandImplementation.AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters)
- return ISqlCommandImplementation.LoadDataTable reader (cmd.Clone()) cfg.ResultSets.[0].ExpectedColumns }
+ use! reader = ISqlCommandImplementation.AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters)
+ return ISqlCommandImplementation.LoadDataTable retryEvent cfg reader (cmd.Clone()) cfg.ResultSets.[0].ExpectedColumns }
mapTask (t, executionType)
// TODO output params
- static member internal ExecuteSingle<'TItem> () = Func<_, _, _, _>(fun reader resultSetDefinition cfg -> Unsafe.uply {
- let! xs = MapRowValues<'TItem> (reader, cfg.ResultType, resultSetDefinition)
+ static member internal ExecuteSingle<'TItem> () = Func<_, _, _, _, _>(fun reader resultSetDefinition retryEvent cfg -> Unsafe.uply {
+ let! xs = MapRowValues<'TItem> (cfg.Tries, cfg.RetryWaitTime, retryEvent, reader, cfg.ResultType, resultSetDefinition)
return
if cfg.SingleRow then
@@ -231,24 +234,24 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design
else
box xs })
- static member internal AsyncExecuteList<'TItem> () = fun (cfg, cmd, connection, parameters, executionType) ->
+ static member internal AsyncExecuteList<'TItem> () = fun (retryEvent, cfg, cmd, connection, parameters, executionType) ->
if cfg.CollectionType = CollectionType.LazySeq && not cfg.SingleRow then
let t = Unsafe.uply {
- let! reader = ISqlCommandImplementation.AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters)
+ let! reader = ISqlCommandImplementation.AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters)
let xs =
if cfg.ResultSets.[0].ExpectedColumns.Length > 1 then
- MapRowValuesOntoTupleLazy<'TItem> (reader, cfg.ResultType, cfg.ResultSets.[0])
+ MapRowValuesOntoTupleLazy<'TItem> (cfg.Tries, cfg.RetryWaitTime, retryEvent, reader, cfg.ResultType, cfg.ResultSets.[0])
else
- MapRowValuesLazy<'TItem> (reader, cfg.ResultSets.[0])
+ MapRowValuesLazy<'TItem> (cfg.Tries, cfg.RetryWaitTime, retryEvent, reader, cfg.ResultSets.[0])
return new LazySeq<'TItem> (xs, reader, cmd) }
mapTask (t, executionType)
else
let xs = Unsafe.uply {
- use! reader = ISqlCommandImplementation.AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters)
- return! MapRowValues<'TItem> (reader, cfg.ResultType, cfg.ResultSets.[0]) }
+ use! reader = ISqlCommandImplementation.AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters)
+ return! MapRowValues<'TItem> (cfg.Tries, cfg.RetryWaitTime, retryEvent, reader, cfg.ResultType, cfg.ResultSets.[0]) }
if cfg.SingleRow then
let t = Unsafe.uply {
@@ -271,7 +274,7 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design
else
mapTask (xs, executionType)
- static member private ReadResultSet (cursor: Common.DbDataReader, resultSetDefinition, cfg) =
+ static member private ReadResultSet (cursor: Common.DbDataReader, resultSetDefinition, retryEvent, cfg) =
ISqlCommandImplementation.VerifyOutputColumns(cursor, resultSetDefinition.ExpectedColumns)
let func =
@@ -283,16 +286,16 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design
typeof
.GetMethod(nameof ISqlCommandImplementation.ExecuteSingle, BindingFlags.NonPublic ||| BindingFlags.Static)
.MakeGenericMethod(resultSetDefinition.ErasedRowType)
- .Invoke(null, [||]) :?> Func<_, _, _, Ply.Ply>
+ .Invoke(null, [||]) :?> Func<_, _, _, _, Ply.Ply>
executeSingleCache.[resultSetDefinition.ErasedRowType] <- func
func
- func.Invoke (cursor, resultSetDefinition, cfg)
+ func.Invoke (cursor, resultSetDefinition, retryEvent, cfg)
- static member internal AsyncExecuteMulti (cfg, cmd, connection, parameters, executionType) =
+ static member internal AsyncExecuteMulti (retryEvent, cfg, cmd, connection, parameters, executionType) =
let t = Unsafe.uply {
- use! cursor = ISqlCommandImplementation.AsyncExecuteDataReaderTask (cfg, cmd, connection, parameters)
+ use! cursor = ISqlCommandImplementation.AsyncExecuteDataReaderTask (retryEvent, cfg, cmd, connection, parameters)
let results = Array.zeroCreate cmd.Statements.Count
// Command contains at least one query
@@ -301,9 +304,9 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design
while go do
let currentStatement = GetStatementIndex.Invoke cursor
- let! res = ISqlCommandImplementation.ReadResultSet (cursor, cfg.ResultSets.[currentStatement], cfg)
+ let! res = ISqlCommandImplementation.ReadResultSet (cursor, cfg.ResultSets.[currentStatement], retryEvent, cfg)
results.[currentStatement] <- res
- let! more = cursor.NextResultAsync ()
+ let! more = Utils.NextResultAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, cursor)
go <- more
ISqlCommandImplementation.SetNumberOfAffectedRows (results, cmd.Statements)
@@ -311,17 +314,17 @@ type ISqlCommandImplementation (commandNameHash: int, cfgBuilder: unit -> Design
mapTask (t, executionType)
- static member internal AsyncExecuteNonQuery (cfg, cmd, connection, parameters, executionType) =
+ static member internal AsyncExecuteNonQuery (retryEvent, cfg, cmd, connection, parameters, executionType) =
let t = Unsafe.uply {
ISqlCommandImplementation.SetParameters (cmd, parameters)
- do! setupConnection (cmd, connection)
+ do! Utils.SetupConnectionAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, cmd, connection)
let readerBehavior = getReaderBehavior (connection, cfg)
use _ = if readerBehavior.HasFlag CommandBehavior.CloseConnection then cmd.Connection else null
if cfg.Prepare then
- do! cmd.PrepareAsync ()
+ do! Utils.PrepareAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, cmd)
- return! cmd.ExecuteNonQueryAsync () }
+ return! Utils.ExecuteNonQueryAsync (cfg.Tries, cfg.RetryWaitTime, retryEvent, cmd) }
mapTask (t, executionType)
diff --git a/src/Runtime/Runtime.fsproj b/src/Runtime/Runtime.fsproj
index a108e4c..4b5b2af 100644
--- a/src/Runtime/Runtime.fsproj
+++ b/src/Runtime/Runtime.fsproj
@@ -9,7 +9,7 @@
true
true
true
- true
+ false
101
preview
diff --git a/src/Runtime/Utils.fs b/src/Runtime/Utils.fs
index d342857..ca4c6fe 100644
--- a/src/Runtime/Utils.fs
+++ b/src/Runtime/Utils.fs
@@ -5,15 +5,156 @@ open System.Data
open System.Data.Common
open System.Collections.Concurrent
open System.ComponentModel
+open System.Linq.Expressions
open Npgsql
open NpgsqlTypes
open FSharp.Control.Tasks.NonAffine
-open System.Linq.Expressions
#nowarn "0025"
+[]
+module internal LocalExtensions =
+
+ type String with
+
+ member this.ErrorClass =
+ if this.Length >= 2
+ then this.Substring 2
+ else raise (InvalidOperationException ())
+
+[]
+module internal Retry =
+
+ let ShouldRetry (triesCurrent, triesMax) =
+ triesMax <= 0 || triesCurrent < triesMax
+
+ let ShouldRetryWithConnection (triesCurrent, triesMax, connection: NpgsqlConnection) =
+ ShouldRetry (triesCurrent, triesMax) &&
+ (connection.State &&& ConnectionState.Open = ConnectionState.Open)
+
+ let rec ShouldRetryException (exn: Exception) =
+ match exn with
+ | :? PostgresException as pgexn ->
+ let sqlState = pgexn.SqlState
+ let errorClass = sqlState.ErrorClass
+ if sqlState = PostgresErrorCodes.IoError ||
+ sqlState = PostgresErrorCodes.DeadlockDetected ||
+ sqlState = PostgresErrorCodes.LockNotAvailable ||
+ sqlState = PostgresErrorCodes.TransactionIntegrityConstraintViolation ||
+ sqlState = PostgresErrorCodes.InFailedSqlTransaction ||
+ sqlState = PostgresErrorCodes.TooManyConnections ||
+ errorClass = PostgresErrorCodes.ConnectionException.ErrorClass ||
+ errorClass = PostgresErrorCodes.InsufficientResources.ErrorClass then
+ true
+ else false
+ | :? NpgsqlException ->
+ true
+ | :? AggregateException as aggexn ->
+ Seq.forall ShouldRetryException aggexn.InnerExceptions
+ | _ -> false
+
+[]
+module internal Async =
+
+ let CatchDb a =
+ async {
+ try
+ let! result = a
+ return Choice1Of2 result
+ with exn when Retry.ShouldRetryException exn ->
+ return Choice2Of2 exn }
+
[]
type Utils () =
+
+ static let rec LoadDataTable' (triesCurrent, exns, triesMax, retryWaitTime: int, retryEvent: Event, cursor, cmd: NpgsqlCommand, result: DataRow DataTable) =
+ try result.Load cursor
+ with exn when Retry.ShouldRetryException exn ->
+ if Retry.ShouldRetryWithConnection (triesCurrent, triesMax, cmd.Connection) then
+ // NOTE: doing a Thread.Sleep here doesn't help.
+ // I am not convinced this code is meant to be run parallel.
+ retryEvent.Trigger exn
+ LoadDataTable' (triesCurrent + 1, exn :: exns, triesMax, retryWaitTime, retryEvent, cursor, cmd, result)
+ else raise (AggregateException (Seq.rev exns))
+
+ static let rec SetupConnectionAsync' (triesCurrent, exns, triesMax, retryWaitTime, retryEvent: Event, cmd: NpgsqlCommand, connection) =
+ async {
+ match connection with
+ | Choice1Of2 connectionString ->
+ cmd.Connection <- new NpgsqlConnection (connectionString)
+ let! choice = cmd.Connection.OpenAsync () |> Async.AwaitTask |> Async.CatchDb
+ match choice with
+ | Choice1Of2 () -> ()
+ | Choice2Of2 exn ->
+ if Retry.ShouldRetry (triesCurrent, triesMax) then
+ do! Async.Sleep retryWaitTime
+ retryEvent.Trigger exn
+ do! SetupConnectionAsync' (triesCurrent+1, exn :: exns, triesMax, retryWaitTime, retryEvent, cmd, connection)
+ else return raise (AggregateException (Seq.rev exns))
+ | Choice2Of2 (conn, tx) ->
+ cmd.Connection <- conn
+ cmd.Transaction <- tx }
+
+ static let rec ReadAsync' (triesCurrent, exns, triesMax, retryWaitTime, retryEvent: Event, cursor: DbDataReader) =
+ async {
+ let! choice = cursor.ReadAsync () |> Async.AwaitTask |> Async.CatchDb
+ match choice with
+ | Choice1Of2 go -> return go
+ | Choice2Of2 exn ->
+ if Retry.ShouldRetry (triesCurrent, triesMax) then
+ do! Async.Sleep retryWaitTime
+ retryEvent.Trigger exn
+ return! ReadAsync' (triesCurrent+1, exn :: exns, triesMax, retryWaitTime, retryEvent, cursor)
+ else return raise (AggregateException (Seq.rev exns)) }
+
+ static let rec NextResultAsync' (triesCurrent, exns, triesMax, retryWaitTime, retryEvent: Event, cursor: DbDataReader) =
+ async {
+ let! choice = cursor.NextResultAsync () |> Async.AwaitTask |> Async.CatchDb
+ match choice with
+ | Choice1Of2 go -> return go
+ | Choice2Of2 exn ->
+ if Retry.ShouldRetry (triesCurrent, triesMax) then
+ do! Async.Sleep retryWaitTime
+ retryEvent.Trigger exn
+ return! NextResultAsync' (triesCurrent+1, exn :: exns, triesMax, retryWaitTime, retryEvent, cursor)
+ else return raise (AggregateException (Seq.rev exns)) }
+
+ static let rec PrepareAsync' (triesCurrent, exns, triesMax, retryWaitTime, retryEvent: Event, cmd: NpgsqlCommand) =
+ async {
+ let! choice = cmd.PrepareAsync () |> Async.AwaitTask |> Async.CatchDb
+ match choice with
+ | Choice1Of2 () -> return ()
+ | Choice2Of2 exn ->
+ if Retry.ShouldRetryWithConnection (triesCurrent, triesMax, cmd.Connection) then
+ do! Async.Sleep retryWaitTime
+ retryEvent.Trigger exn
+ return! PrepareAsync' (triesCurrent+1, exn :: exns, triesMax, retryWaitTime, retryEvent, cmd)
+ else return raise (AggregateException (Seq.rev exns)) }
+
+ static let rec ExecuteReaderAsync' (triesCurrent, exns, triesMax, retryWaitTime, retryEvent: Event, behavior: CommandBehavior, cmd: NpgsqlCommand) =
+ async {
+ let! choice = cmd.ExecuteReaderAsync behavior |> Async.AwaitTask |> Async.CatchDb
+ match choice with
+ | Choice1Of2 reader -> return reader
+ | Choice2Of2 exn ->
+ if Retry.ShouldRetryWithConnection (triesCurrent, triesMax, cmd.Connection) then
+ do! Async.Sleep retryWaitTime
+ retryEvent.Trigger exn
+ return! ExecuteReaderAsync' (triesCurrent+1, exn :: exns, triesMax, retryWaitTime, retryEvent, behavior, cmd)
+ else return raise (AggregateException (Seq.rev exns)) }
+
+ static let rec ExecuteNonQueryAsync' (triesCurrent, exns, triesMax, retryWaitTime, retryEvent: Event, cmd: NpgsqlCommand) =
+ async {
+ let! choice = cmd.ExecuteNonQueryAsync () |> Async.AwaitTask |> Async.CatchDb
+ match choice with
+ | Choice1Of2 rowsAffected -> return rowsAffected
+ | Choice2Of2 exn ->
+ if Retry.ShouldRetryWithConnection (triesCurrent, triesMax, cmd.Connection) then
+ do! Async.Sleep retryWaitTime
+ retryEvent.Trigger exn
+ return! ExecuteNonQueryAsync' (triesCurrent+1, exn :: exns, triesMax, retryWaitTime, retryEvent, cmd)
+ else return raise (AggregateException (Seq.rev exns)) }
+
static let getColumnMapping =
let cache = ConcurrentDictionary obj> ()
let factory = Func<_, _>(fun (typeParam: Type) ->
@@ -78,6 +219,33 @@ type Utils () =
cache.[resultSet.ExpectedColumns.GetHashCode ()] <- func
func
+ static member LoadDataTable (tries, retryWaitTime, retryEvent, cursor, cmd, result) =
+ LoadDataTable' (0, [], tries, retryWaitTime, retryEvent, cursor, cmd, result)
+
+ static member SetupConnectionAsync (tries, retryWaitTime, retryEvent, cmd, connection) =
+ async {
+ do! SetupConnectionAsync' (0, [], tries, retryWaitTime, retryEvent, cmd, connection) }
+
+ static member ReadAsync (tries, retryWaitTime, retryEvent, cursor) =
+ async {
+ return! ReadAsync' (0, [], tries, retryWaitTime, retryEvent, cursor) }
+
+ static member NextResultAsync (tries, retryWaitTime, retryEvent, cursor) =
+ async {
+ return! NextResultAsync' (0, [], tries, retryWaitTime, retryEvent, cursor) }
+
+ static member PrepareAsync (tries, retryWaitTime, retryEvent, cmd) =
+ async {
+ return! PrepareAsync' (0, [], tries, retryWaitTime, retryEvent, cmd) }
+
+ static member ExecuteReaderAsync (tries, retryWaitTime, retryEvent, behavior, cmd) =
+ async {
+ return! ExecuteReaderAsync' (0, [], tries, retryWaitTime, retryEvent, behavior, cmd) }
+
+ static member ExecuteNonQueryAsync (tries, retryWaitTime, retryEvent, cmd) =
+ async {
+ return! ExecuteNonQueryAsync' (0, [], tries, retryWaitTime, retryEvent, cmd) }
+
static member ResizeArrayToList ra =
let rec inner (ra: ResizeArray<'a>, index, acc) =
if index = 0 then
@@ -186,11 +354,11 @@ type Utils () =
let [| columnName; typeName; nullable |] = stringValues.Split '|'
new DataColumn (columnName, Utils.GetType typeName, AllowDBNull = (nullable = "1"))
- static member MapRowValuesOntoTuple<'TItem> (cursor: DbDataReader, resultType, resultSet) = Unsafe.uply {
+ static member MapRowValuesOntoTuple<'TItem> (tries, retryWaitTime, retryEvent, cursor: DbDataReader, resultType, resultSet) = Unsafe.uply {
let results = ResizeArray<'TItem> ()
let rowReader = getRowToTupleReader resultSet (resultType = ResultType.Records)
- let! go = cursor.ReadAsync ()
+ let! go = Utils.ReadAsync (tries, retryWaitTime, retryEvent, cursor)
let mutable go = go
while go do
@@ -198,27 +366,27 @@ type Utils () =
|> unbox
|> results.Add
- let! cont = cursor.ReadAsync ()
+ let! cont = Utils.ReadAsync (tries, retryWaitTime, retryEvent, cursor)
go <- cont
return results }
- static member MapRowValuesOntoTupleLazy<'TItem> (cursor: DbDataReader, resultType, resultSet) =
+ static member MapRowValuesOntoTupleLazy<'TItem> (tries, retryWaitTime, retryEvent, cursor: DbDataReader, resultType, resultSet) =
seq {
let rowReader = getRowToTupleReader resultSet (resultType = ResultType.Records)
- while cursor.Read () do
+ while Utils.ReadAsync (tries, retryWaitTime, retryEvent, cursor) |> Async.RunSynchronously do
rowReader.Invoke cursor |> unbox<'TItem>
}
- static member MapRowValues<'TItem> (cursor: DbDataReader, resultType, resultSet: ResultSetDefinition) =
+ static member MapRowValues<'TItem> (tries, retryWaitTime, retryEvent, cursor: DbDataReader, resultType, resultSet: ResultSetDefinition) =
if resultSet.ExpectedColumns.Length > 1 then
- Utils.MapRowValuesOntoTuple<'TItem> (cursor, resultType, resultSet)
+ Utils.MapRowValuesOntoTuple<'TItem> (tries, retryWaitTime, retryEvent, cursor, resultType, resultSet)
else Unsafe.uply {
let columnMapping = getColumnMapping resultSet.ExpectedColumns.[0]
let results = ResizeArray<'TItem> ()
- let! go = cursor.ReadAsync ()
+ let! go = Utils.ReadAsync (tries, retryWaitTime, retryEvent, cursor)
let mutable go = go
while go do
@@ -227,16 +395,16 @@ type Utils () =
|> unbox
|> results.Add
- let! cont = cursor.ReadAsync ()
+ let! cont = Utils.ReadAsync (tries, retryWaitTime, retryEvent, cursor)
go <- cont
return results }
- static member MapRowValuesLazy<'TItem> (cursor: DbDataReader, resultSet) =
+ static member MapRowValuesLazy<'TItem> (tries, retryWaitTime, retryEvent, cursor: DbDataReader, resultSet) =
seq {
let columnMapping = getColumnMapping resultSet.ExpectedColumns.[0]
- while cursor.Read () do
+ while Utils.ReadAsync (tries, retryWaitTime, retryEvent, cursor) |> Async.RunSynchronously do
cursor.GetValue 0
|> columnMapping
|> unbox<'TItem>
diff --git a/tests/NpgsqlConnectionTests.fs b/tests/NpgsqlConnectionTests.fs
index 31db387..0482c87 100644
--- a/tests/NpgsqlConnectionTests.fs
+++ b/tests/NpgsqlConnectionTests.fs
@@ -2,8 +2,9 @@ module NpgsqlConnectionTests
open System
open Xunit
-open FSharp.Data.Npgsql
open System.Reflection
+open System.Threading.Tasks
+open FSharp.Data.Npgsql
open type Npgsql.NpgsqlNetTopologySuiteExtensions
open NetTopologySuite.Geometries
@@ -151,6 +152,89 @@ let paramInLimit() =
[]
let getRentalById = "SELECT return_date FROM rental WHERE rental_id = @id"
+[]
+let retry () =
+ let chc =
+ seq {
+ for _ in 1 .. 10 do
+ let connectionStrWithIncorrectPort = "Host=localhost;Username=postgres;Password=postgres;Database=dvdrental;Port=1313"
+ let cmd = DvdRental.CreateCommand<"SELECT * FROM rental", ResultType.DataTable, Tries = 5> connectionStrWithIncorrectPort
+ yield async {
+ let! result = cmd.AsyncExecute ()
+ (cmd :> IDisposable).Dispose ()
+ return result }}
+ |> Async.Parallel
+ |> Async.Catch
+ |> Async.RunSynchronously
+ let isExpectedAggregateException =
+ match chc with
+ | Choice2Of2 exn ->
+ match exn with
+ | :? AggregateException as aggexn ->
+ match aggexn.InnerException with
+ | :? AggregateException as aggexn2 ->
+ match aggexn2.InnerException with
+ | :? AggregateException as aggexn3 ->
+ match aggexn3.InnerException with
+ | :? Npgsql.NpgsqlException -> true
+ | _ -> false
+ | _ -> false
+ | _ -> false
+ | _ -> false
+ | Choice1Of2 _ -> false
+ Assert.True isExpectedAggregateException
+
+exception ContrivedRetryException of unit
+
+[]
+let retryEvent () =
+ let chc =
+ seq {
+ for _ in 1 .. 10 do
+ let connectionStrWithIncorrectPort = "Host=localhost;Username=postgres;Password=postgres;Database=dvdrental;Port=1313"
+ let cmd = DvdRental.CreateCommand<"SELECT * FROM rental", ResultType.DataTable, Tries = 5> connectionStrWithIncorrectPort
+ cmd.add_RetryEvent (fun _ _ -> raise (ContrivedRetryException ()))
+ yield async {
+ let! result = cmd.AsyncExecute ()
+ (cmd :> IDisposable).Dispose ()
+ return result }}
+ |> Async.Parallel
+ |> Async.Catch
+ |> Async.RunSynchronously
+ let isExpectedContrivedException =
+ match chc with
+ | Choice2Of2 exn ->
+ match exn with
+ | :? AggregateException as aggexn ->
+ match aggexn.InnerException with
+ | :? ContrivedRetryException -> true
+ | _ -> false
+ | _ -> false
+ | Choice1Of2 _ -> false
+ Assert.True isExpectedContrivedException
+
+type DvdRental' = NpgsqlConnection<"Host=localhost;Username=postgres;Password=postgres;Database=dvdrental;Port=5432", MethodTypes = methodTypes, AsyncChoice = true>
+
+[]
+let retryAsyncChoice () =
+ let chc =
+ seq {
+ for _ in 1 .. 10 do
+ let connectionStrWithIncorrectPort = "Host=localhost;Username=postgres;Password=postgres;Database=dvdrental;Port=1313"
+ let cmd = DvdRental'.CreateCommand<"SELECT * FROM rental", ResultType.DataTable, Tries = 5> connectionStrWithIncorrectPort
+ yield async {
+ let! result = cmd.AsyncExecute ()
+ (cmd :> IDisposable).Dispose ()
+ return match result with Choice1Of2 r -> r | Choice2Of2 _ -> raise (ContrivedRetryException ()) }}
+ |> Async.Parallel
+ |> Async.Catch
+ |> Async.RunSynchronously
+ let isExpectedContrivedException =
+ match chc with
+ | Choice2Of2 exn -> match exn with :? ContrivedRetryException -> true | _ -> false
+ | Choice1Of2 _ -> false
+ Assert.True isExpectedContrivedException
+
[]
let dateTableWithUpdate() =