Skip to content

Commit bfafe0e

Browse files
committed
Add unsubscribe
1 parent 8542d4c commit bfafe0e

File tree

2 files changed

+92
-3
lines changed

2 files changed

+92
-3
lines changed

README.md

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,76 @@ Options:
5858
Help:
5959
An MQTT version 3.1/3.1.1/5.0 client for publishing simple messages
6060
```
61+
62+
### Subscribe
63+
64+
```bash
65+
$ php vendor/bin/mqtt subscribe --help
66+
Description:
67+
Subscribing to topics
68+
69+
Usage:
70+
subscribe [options]
71+
72+
Options:
73+
-H, --host[=HOST] Specify the host to connect to [default: "localhost"]
74+
-P, --port[=PORT] Connect to the port specified [default: 1883]
75+
-i, --id[=ID] The id to use for this client [default: ""]
76+
--qos=QOS Specify the quality of service to use for the message, from 0, 1 and 2 (multiple values allowed)
77+
-t, --topic=TOPIC The MQTT topic to subscribe to (multiple values allowed)
78+
-u, --username[=USERNAME] Provide a username to be used for authenticating with the broker
79+
-p, --pw[=PW] Provide a password to be used for authenticating with the broker
80+
-c, --clean-session[=CLEAN-SESSION] Setting the 'clean session' flag [default: true]
81+
-l, --level=LEVEL MQTT Protocol level [default: 4]
82+
-k, --keepalive[=KEEPALIVE] The number of seconds between sending PING commands to the broker for the purposes of informing it we are still connected and functioning [default: 0]
83+
--will-topic[=WILL-TOPIC] The topic on which to send a Will, in the event that the client disconnects unexpectedly
84+
--will-message[=WILL-MESSAGE] Specify a message that will be stored by the broker and sent out if this client disconnects unexpectedly
85+
--will-qos[=WILL-QOS] The QoS to use for the Will [default: 0]
86+
--will-retain[=WILL-RETAIN] If given, if the client disconnects unexpectedly the message sent out will be treated as a retained message [default: 0]
87+
-S, --ssl[=SSL] Enable SSL encryption [default: false]
88+
--config-path[=CONFIG-PATH] Setting the Swoole config file path
89+
--properties-path[=PROPERTIES-PATH] Setting the Properties config file path
90+
-U, --unsubscribe[=UNSUBSCRIBE] Topics that need to be unsubscribed (multiple values allowed)
91+
-h, --help Display help for the given command. When no command is given display help for the list command
92+
-q, --quiet Do not output any message
93+
-V, --version Display this application version
94+
--ansi Force ANSI output
95+
--no-ansi Disable ANSI output
96+
-n, --no-interaction Do not ask any interactive question
97+
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
98+
99+
Help:
100+
An MQTT version 3.1/3.1.1/5.0 client for subscribing to topics
101+
```
102+
103+
### Path
104+
105+
There are two config: `--config-path` and `--properties-path`, you need to specify the path to the config file.
106+
107+
- `--config-path`
108+
109+
```php
110+
// config.php
111+
112+
return [
113+
'open_mqtt_protocol' => true,
114+
'package_max_length' => 2 * 1024 * 1024,
115+
];
116+
```
117+
118+
- `--properties-path`
119+
120+
```php
121+
// properties.php
122+
123+
return [
124+
'publish' => [
125+
'topic_alias' => 1,
126+
'message_expiry_interval' => 12,
127+
],
128+
'will' => [
129+
'will_delay_interval' => 60,
130+
'message_expiry_interval' => 60,
131+
],
132+
];
133+
```

src/Handler/SubscribeHandler.php

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,26 @@ public function handle(InputInterface $input, OutputInterface $output): int
6363
if ($config->isMQTT5()) {
6464
$this->logInfo("Connect Reason Code: {$connect['code']}, Reason: " . ReasonCode::getReasonPhrase($connect['code']));
6565
}
66-
// TODO: unsubscribe
66+
67+
// unsubscribe
68+
$un_subscribe = $this->getUnsubscribe();
69+
if (!empty($un_subscribe)) {
70+
$un_subscribe_res = $client->unSubscribe($un_subscribe, $this->getProperties('unsubscribe'));
71+
$this->log(json_encode($un_subscribe_res));
72+
73+
$unsub_ack_data = array_combine($topic, $un_subscribe_res['codes']);
74+
foreach ($unsub_ack_data as $key => $code) {
75+
$this->logInfo("UnSubscribe [{$key}], Reason Code: {$code}, Reason: " . ReasonCode::getReasonPhrase($code));
76+
}
77+
78+
if (empty($subscribe)) {
79+
return Command::SUCCESS;
80+
}
81+
}
6782

6883
// subscribe
6984
$sub_ack = $client->subscribe($subscribe, $this->getProperties('subscribe'));
7085
$this->log(json_encode($sub_ack));
71-
7286
if (is_array($sub_ack)) {
7387
if ($sub_ack['type'] === Types::SUBACK) {
7488
$sub_ack_data = array_combine($topic, $sub_ack['codes']);
@@ -77,6 +91,7 @@ public function handle(InputInterface $input, OutputInterface $output): int
7791
}
7892
}
7993
if (isset($sub_ack['code'])) {
94+
$client->close();
8095
$this->logError('Subscribe error, ' . ReasonCode::getReasonPhrase($sub_ack['code']));
8196
goto failure;
8297
}
@@ -87,7 +102,8 @@ public function handle(InputInterface $input, OutputInterface $output): int
87102
$buffer = $client->recv();
88103
if ($buffer && $buffer !== true) {
89104
$this->log(json_encode($buffer));
90-
// need event
105+
// TODO: need event
106+
// $client $this->input $this->output
91107
}
92108
if ($timeSincePing <= (time() - $client->getConfig()->getKeepAlive())) {
93109
$buffer = $client->ping();

0 commit comments

Comments
 (0)