Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,35 @@ pub fn main() !void {
}
```

qos consumer with prefetch_count=1 and message no_ack=false:

```zig
const std = @import("std");
const amqp = @import("amqp");

var rx_memory: [4096]u8 = undefined;
var tx_memory: [4096]u8 = undefined;

pub fn main() !void {
var conn = amqp.init(rx_memory[0..], tx_memory[0..]);
const addr = try std.net.Address.parseIp4("127.0.0.1", 5672);
var i: usize = 0;
try conn.connect(addr);

var ch = try conn.channel();
_ = try ch.queueDeclare("qos_consumer", .{}, null);

var couchdb_consumer = try ch.qosConsume("qos_consumer", .{ .prefetch_count = 1 }, .{ .no_ack = false }, null);

while (true) : (i += 1) {
const message = try couchdb_consumer.next();
_ = message.header;
const body = message.body;
// ...do something with the message
try couchdb_consumer.ack(false);
}
```

## Status

The project is alpha with only basic functionality working and almost certainly is not
Expand All @@ -53,10 +82,10 @@ require.
- None...and the binaries are small (other than a server to speak to)

```
➜ zig-amqp git:(master) ✗ zig build-exe src/example.zig -O ReleaseSafe --strip
➜ zig-amqp git:(master) ✗ ldd example
➜ zig-amqp git:(master) ✗ zig build-exe src/example.zig -O ReleaseSafe --strip
➜ zig-amqp git:(master) ✗ ldd example
not a dynamic executable
➜ zig-amqp git:(master) ✗ ls -l example
➜ zig-amqp git:(master) ✗ ls -l example
-rwxr-xr-x. 1 malcolm malcolm 44872 Dec 20 04:26 example
```

Expand Down
16 changes: 15 additions & 1 deletion src/basic.zig
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,19 @@ pub const Basic = struct {
exclusive: bool = false,
no_wait: bool = false,
};
pub const Qos = struct {
prefetch_size: u32 = 0,
prefetch_count: u16 = 1,
global: bool = false,
};
};

pub const Consumer = struct {
connector: Connector,
delivery: proto.Basic.Deliver,

pub fn next(consumer: *Consumer) !Message {
_ = try proto.Basic.awaitDeliver(&consumer.connector);
consumer.delivery = try proto.Basic.awaitDeliver(&consumer.connector);
const header = try consumer.connector.awaitHeader();
const body = try consumer.connector.awaitBody();

Expand All @@ -26,6 +32,14 @@ pub const Basic = struct {
.body = body,
};
}

pub fn ack(consumer: *Consumer, multiple: bool) !void {
_ = try proto.Basic.ackAsync(
&consumer.connector,
consumer.delivery.delivery_tag,
multiple,
);
}
};

pub const Publish = struct {
Expand Down
26 changes: 26 additions & 0 deletions src/channel.zig
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,32 @@ pub const Channel = struct {

return Basic.Consumer{
.connector = channel.connector,
.delivery = undefined,
};
}

pub fn qosConsume(channel: *Channel, name: []const u8, qos_options: Basic.Consume.Qos, options: Basic.Consume.Options, args: ?*Table) !Basic.Consumer {
_ = try proto.Basic.qosSync(
&channel.connector,
qos_options.prefetch_size,
qos_options.prefetch_count,
qos_options.global,
);

_ = try proto.Basic.consumeSync(
&channel.connector,
name,
"",
options.no_local,
options.no_ack,
options.exclusive,
options.no_wait,
args,
);

return Basic.Consumer{
.connector = channel.connector,
.delivery = undefined,
};
}
};