Skip to content

Commit e0d49a3

Browse files
committed
Update getMqttClient and Add getSubTopic, getSubQos, getUnsubscribe
1 parent 64399fa commit e0d49a3

File tree

2 files changed

+34
-5
lines changed

2 files changed

+34
-5
lines changed

src/Handler/AbstractHandler.php

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,29 @@ public function getTopic(): string
9797
return (string) $this->input->getOption('topic');
9898
}
9999

100+
public function getSubTopic(): array
101+
{
102+
return $this->input->getOption('topic');
103+
}
104+
105+
public function getSubQos(): array
106+
{
107+
$qos = $this->input->getOption('qos');
108+
109+
$subQos = [];
110+
if ($this->getProtocolLevel() === ProtocolInterface::MQTT_PROTOCOL_LEVEL_5_0) {
111+
foreach ($qos as $item) {
112+
$subQos[] = ['qos' => (int) $item];
113+
}
114+
} else {
115+
foreach ($qos as $item) {
116+
$subQos[] = (int) $item;
117+
}
118+
}
119+
120+
return $subQos;
121+
}
122+
100123
public function getMessage(): string
101124
{
102125
return (string) $this->input->getOption('message');
@@ -153,9 +176,9 @@ public function getConnectConfig(): ClientConfig
153176
->setMaxAttempts(0); // Disable auto reconnection
154177
}
155178

156-
public function getMqttClient(): Client
179+
public function getMqttClient(string $host, int $port, ClientConfig $config): Client
157180
{
158-
return new Client($this->getHost(), $this->getPort(), $this->getConnectConfig());
181+
return new Client($host, $port, $config);
159182
}
160183

161184
public function genWillData(): array
@@ -201,6 +224,11 @@ public function getProperties(string $key = ''): array
201224
return [];
202225
}
203226

227+
public function getUnsubscribe(): array
228+
{
229+
return $this->input->getOption('unsubscribe');
230+
}
231+
204232
protected function log($msg): void
205233
{
206234
$date = date('Y-m-d H:i:s');

src/Handler/PublishHandler.php

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@ public function handle(InputInterface $input, OutputInterface $output): int
3434
}
3535

3636
try {
37-
$client = $this->getMqttClient();
37+
$config = $this->getConnectConfig();
38+
$client = $this->getMqttClient($this->getHost(), $this->getPort(), $config);
3839
$connect = $client->connect($this->getCleanSession(), $this->genWillData());
3940
$this->logInfo("Connect {$this->getHost()} successfully, recv: ");
4041
$this->log(json_encode($connect));
41-
if ($this->getConnectConfig()->isMQTT5()) {
42+
if ($config->isMQTT5()) {
4243
$this->logInfo("Connect Reason Code: {$connect['code']}, Reason: " . ReasonCode::getReasonPhrase($connect['code']));
4344
}
4445
$publish = $client->publish($topic, $message, $this->getQos(), $this->getDup(), $this->getRetain(), $this->getProperties('publish'));
@@ -50,7 +51,7 @@ public function handle(InputInterface $input, OutputInterface $output): int
5051
if (is_array($publish)) {
5152
$this->logInfo("Publish message '{$message}' to '{$topic}', recv: ");
5253
$this->log(json_encode($publish));
53-
if ($this->getConnectConfig()->isMQTT5()) {
54+
if ($config->isMQTT5()) {
5455
$this->logInfo("Publish Reason Code: {$publish['code']}, Reason: " . ReasonCode::getReasonPhrase($publish['code']));
5556
if ($publish['code']) {
5657
goto failure;

0 commit comments

Comments
 (0)