Skip to content

Commit 168cc3e

Browse files
committed
Add SubscribeCommand and SubscribeHandler
1 parent e0d49a3 commit 168cc3e

File tree

2 files changed

+167
-0
lines changed

2 files changed

+167
-0
lines changed

src/Command/SubscribeCommand.php

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
/**
3+
* This file is part of Simps
4+
*
5+
* @link https://github.com/simps/mqtt
6+
* @contact Lu Fei <lufei@simps.io>
7+
*
8+
* For the full copyright and license information,
9+
* please view the LICENSE file that was distributed with this source code
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Simps\MQTTCLI\Command;
15+
16+
use Simps\MQTTCLI\Handler\SubscribeHandler;
17+
use Symfony\Component\Console\Command\Command;
18+
use Symfony\Component\Console\Input\InputDefinition;
19+
use Symfony\Component\Console\Input\InputInterface;
20+
use Symfony\Component\Console\Input\InputOption;
21+
use Symfony\Component\Console\Output\OutputInterface;
22+
23+
class SubscribeCommand extends Command
24+
{
25+
protected static $defaultName = 'subscribe';
26+
27+
protected function configure()
28+
{
29+
$this->setDescription('Subscribing to topics')
30+
->setHelp('An MQTT version 3.1/3.1.1/5.0 client for subscribing to topics')
31+
->setDefinition(
32+
new InputDefinition([
33+
new InputOption('host', 'H', InputOption::VALUE_OPTIONAL, 'Specify the host to connect to', 'localhost'),
34+
new InputOption('port', 'P', InputOption::VALUE_OPTIONAL, 'Connect to the port specified', 1883),
35+
new InputOption('id', 'i', InputOption::VALUE_OPTIONAL, 'The id to use for this client', ''),
36+
new InputOption('qos', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Specify the quality of service to use for the message, from 0, 1 and 2'),
37+
new InputOption('topic', 't', InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'The MQTT topic to subscribe to'),
38+
new InputOption('username', 'u', InputOption::VALUE_OPTIONAL, 'Provide a username to be used for authenticating with the broker'),
39+
new InputOption('pw', 'p', InputOption::VALUE_OPTIONAL, 'Provide a password to be used for authenticating with the broker'),
40+
new InputOption('clean-session', 'c', InputOption::VALUE_OPTIONAL, "Setting the 'clean session' flag", true),
41+
new InputOption('level', 'l', InputOption::VALUE_REQUIRED, 'MQTT Protocol level', 4),
42+
new InputOption('keepalive', 'k', InputOption::VALUE_OPTIONAL, 'The number of seconds between sending PING commands to the broker for the purposes of informing it we are still connected and functioning', 0),
43+
new InputOption('will-topic', null, InputOption::VALUE_OPTIONAL, 'The topic on which to send a Will, in the event that the client disconnects unexpectedly'),
44+
new InputOption('will-message', null, InputOption::VALUE_OPTIONAL, 'Specify a message that will be stored by the broker and sent out if this client disconnects unexpectedly'),
45+
new InputOption('will-qos', null, InputOption::VALUE_OPTIONAL, 'The QoS to use for the Will', 0),
46+
new InputOption('will-retain', null, InputOption::VALUE_OPTIONAL, 'If given, if the client disconnects unexpectedly the message sent out will be treated as a retained message', 0),
47+
new InputOption('ssl', 'S', InputOption::VALUE_OPTIONAL, 'Enable SSL encryption', false),
48+
new InputOption('config-path', null, InputOption::VALUE_OPTIONAL, 'Setting the Swoole config file path'),
49+
new InputOption('properties-path', null, InputOption::VALUE_OPTIONAL, 'Setting the Properties config file path'),
50+
new InputOption('unsubscribe', 'U', InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Topics that need to be unsubscribed'),
51+
])
52+
);
53+
}
54+
55+
protected function execute(InputInterface $input, OutputInterface $output)
56+
{
57+
return (new SubscribeHandler())->handle($input, $output);
58+
}
59+
}

src/Handler/SubscribeHandler.php

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
<?php
2+
/**
3+
* This file is part of Simps
4+
*
5+
* @link https://github.com/simps/mqtt
6+
* @contact Lu Fei <lufei@simps.io>
7+
*
8+
* For the full copyright and license information,
9+
* please view the LICENSE file that was distributed with this source code
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Simps\MQTTCLI\Handler;
15+
16+
use Simps\MQTT\Hex\ReasonCode;
17+
use Simps\MQTT\Protocol\ProtocolInterface;
18+
use Simps\MQTT\Protocol\Types;
19+
use Symfony\Component\Console\Command\Command;
20+
use Symfony\Component\Console\Input\InputInterface;
21+
use Symfony\Component\Console\Output\OutputInterface;
22+
23+
class SubscribeHandler extends AbstractHandler
24+
{
25+
public function handle(InputInterface $input, OutputInterface $output): int
26+
{
27+
$this->input = $input;
28+
$this->output = $output;
29+
30+
$topic = $this->getSubTopic();
31+
$qos = $this->getSubQos();
32+
33+
if (empty($topic)) {
34+
$this->logError('The topic you need to subscribe to cannot be empty');
35+
goto failure;
36+
}
37+
38+
$topic_num = count($topic);
39+
$qos_num = count($qos);
40+
$config = $this->getConnectConfig();
41+
42+
if ($topic_num !== 0 && $qos_num === 0) {
43+
$qos_data = ProtocolInterface::MQTT_QOS_0;
44+
if ($config->isMQTT5()) {
45+
$qos_data = ['qos' => $qos_data];
46+
}
47+
$qos = array_fill(0, $topic_num, $qos_data);
48+
$qos_num = $topic_num;
49+
}
50+
51+
if ($topic_num !== $qos_num) {
52+
$this->logError("The number of topics to subscribe to and the number of qos do not match. topic[{$topic_num}], qos[{$qos_num}]");
53+
goto failure;
54+
}
55+
56+
$subscribe = array_combine($topic, $qos);
57+
58+
try {
59+
$client = $this->getMqttClient($this->getHost(), $this->getPort(), $config);
60+
$connect = $client->connect($this->getCleanSession(), $this->genWillData());
61+
$this->logInfo("Connect {$this->getHost()} successfully, recv: ");
62+
$this->log(json_encode($connect));
63+
if ($config->isMQTT5()) {
64+
$this->logInfo("Connect Reason Code: {$connect['code']}, Reason: " . ReasonCode::getReasonPhrase($connect['code']));
65+
}
66+
// TODO: unsubscribe
67+
68+
// subscribe
69+
$sub_ack = $client->subscribe($subscribe, $this->getProperties('subscribe'));
70+
$this->log(json_encode($sub_ack));
71+
72+
if (is_array($sub_ack)) {
73+
if ($sub_ack['type'] === Types::SUBACK) {
74+
$sub_ack_data = array_combine($topic, $sub_ack['codes']);
75+
foreach ($sub_ack_data as $key => $code) {
76+
$this->logInfo("Subscribe [{$key}], Reason Code: {$code}, Reason: " . ReasonCode::getReasonPhrase($code, $code <= ReasonCode::GRANTED_QOS_2));
77+
}
78+
}
79+
if (isset($sub_ack['code'])) {
80+
$this->logError('Subscribe error, ' . ReasonCode::getReasonPhrase($sub_ack['code']));
81+
goto failure;
82+
}
83+
}
84+
85+
$timeSincePing = time();
86+
while (true) {
87+
$buffer = $client->recv();
88+
if ($buffer && $buffer !== true) {
89+
$this->log(json_encode($buffer));
90+
// need event
91+
}
92+
if ($timeSincePing <= (time() - $client->getConfig()->getKeepAlive())) {
93+
$buffer = $client->ping();
94+
if ($buffer) {
95+
$timeSincePing = time();
96+
$this->logInfo('Send ping success');
97+
}
98+
}
99+
}
100+
} catch (\Throwable $e) {
101+
$client->close();
102+
$this->logError("Subscribe error, {$e->getMessage()}");
103+
}
104+
105+
failure:
106+
return Command::FAILURE;
107+
}
108+
}

0 commit comments

Comments
 (0)