Postgres-async-driver is a non-blocking Java driver for PostgreSQL. The driver supports connection pooling, prepared statements, transactions, timeouts, back-pressure all standard SQL types and custom column types.
This is a fork of Antti Laisi's PostgreSQL asynchronous driver. Some refactorings has been applied in order to implement back-pressure (currently only on TCP level) and timeouts.
Postgres-async-driver is available on Maven Central.
<dependency>
<groupId>com.github.jaceksokol</groupId>
<artifactId>postgres-async-driver</artifactId>
<version>0.1.0</version>
</dependency>Db is a connection pool that is created with com.github.pgasync.ConnectionPoolBuilder
Db db = new ConnectionPoolBuilder()
.hostname("localhost")
.port(5432)
.database("db")
.username("user")
.password("pass")
.poolSize(20)
.connectTimeout(1, TimeUnit.SECONDS)
.statementTimeout(10, TimeUnit.SECONDS)
.build();Each connection pool will start only one IO thread used in communicating with PostgreSQL backend and executing callbacks for all connections.
Querying for a set returns an rx.Observable that emits a single ResultSet. This method does not supports back-pressure.
db.querySet("select 'Hello world!' as message")
.map(result -> result.row(0).getString("message"))
.subscribe(System.out::println)
// => Hello worldQuerying for rows returns an rx.Observable that emits 0-n Rows. The rows are emitted immediately as they are received from the server instead of waiting for the entire query to complete. This method supports back-pressure.
db.queryRows("select unnest('{ hello, world }'::text[] as message)")
.map(row -> row.getString("message"))
.subscribe(System.out::println)
// => hello
// => worldPrepared statements use native PostgreSQL syntax $index. Supported parameter types are all primitive types, String, BigDecimal, BigInteger, UUID, temporal types in java.sql package and byte[].
db.querySet("insert into message(id, body) values($1, $2)", 123, "hello")
.subscribe(result -> out.printf("Inserted %d rows", result.updatedRows()));A transactional unit of work is started with begin(). Queries issued to the emitted Transaction are executed in the same transaction and the tx is automatically rolled back on query failure.
db.begin()
.flatMap(tx -> tx.querySet("insert into products (name) values ($1) returning id", "saw")
.map(productsResult -> productsResult.row(0).getLong("id"))
.flatMap(id -> tx.querySet("insert into promotions (product_id) values ($1)", id))
.flatMap(promotionsResult -> tx.commit())
).subscribe(
__ -> System.out.println("Transaction committed"),
Throwable::printStackTrace
);You can set default statement timeout for ConnectionPool and additionally per query.
db.withTimeout(1, TimeUnit.SECONDS).queryRows("select * from events").subscribe();Support for additional data types requires registering converters to com.github.pgasync.ConnectionPoolBuilder
class JsonConverter implements Converter<example.Json> {
@Override
public Class<example.Json> type() {
return example.Json.class;
}
@Override
public byte[] from(example.Json json) {
return json.toBytes();
}
@Override
public example.Json to(Oid oid, byte[] value) {
return new example.Json(new String(value, UTF_8));
}
}
Db db = new ConnectionPoolBuilder()
// ...
.converters(new JsonConverter())
.build();