|
|
@ -1,8 +1,9 @@ |
|
|
|
(ns milquetoast.client |
|
|
|
(:require [clojure.core.async :as async :refer [go go-loop <! >!]] |
|
|
|
(:require [clojure.core.async :as async :refer [go go-loop <! >! alts!!]] |
|
|
|
[clojure.data.json :as json]) |
|
|
|
(:import [org.eclipse.paho.client.mqttv3 MqttClient MqttConnectOptions MqttMessage IMqttMessageListener] |
|
|
|
org.eclipse.paho.client.mqttv3.persist.MemoryPersistence)) |
|
|
|
org.eclipse.paho.client.mqttv3.persist.MemoryPersistence |
|
|
|
java.time.Instant)) |
|
|
|
|
|
|
|
(defn- create-mqtt-client! [broker-uri username password] |
|
|
|
(let [client-id (MqttClient/generateClientId) |
|
|
@ -35,7 +36,8 @@ |
|
|
|
(send-message! [_ topic msg opts]) |
|
|
|
(add-channel! [_ chan]) |
|
|
|
(stop! [_]) |
|
|
|
(subscribe-topic! [_ topic opts])) |
|
|
|
(subscribe-topic! [_ topic opts]) |
|
|
|
(get-topic! [_ topic opts])) |
|
|
|
|
|
|
|
(defrecord MilquetoastClient [client open-channels verbose] |
|
|
|
IMilquetoastClient |
|
|
@ -51,15 +53,28 @@ |
|
|
|
(add-channel! [_ chan] |
|
|
|
(swap! open-channels conj chan)) |
|
|
|
(subscribe-topic! [self topic opts] |
|
|
|
(let [{:keys [buffer-size]} opts |
|
|
|
(let [{:keys [buffer-size qos] |
|
|
|
:or {buffer-size 1 qos 0}} opts |
|
|
|
chan (async/chan buffer-size)] |
|
|
|
(add-channel! self chan) |
|
|
|
(.subscribe client topic |
|
|
|
(.subscribe client topic qos |
|
|
|
(proxy [IMqttMessageListener] [] |
|
|
|
(messageArrived [topic mqtt-message] |
|
|
|
(go (>! chan (assoc (parse-message mqtt-message) |
|
|
|
:topic topic)))))) |
|
|
|
chan))) |
|
|
|
chan)) |
|
|
|
(get-topic! [_ topic opts] |
|
|
|
(let [{:keys [qos timeout] :or {qos 0 timeout 5}} opts |
|
|
|
result-chan (async/chan)] |
|
|
|
(.subscribe client topic qos |
|
|
|
(proxy [IMqttMessageListener] [] |
|
|
|
(messageArrived [topic mqtt-message] |
|
|
|
(go (>! result-chan (assoc (parse-message mqtt-message) |
|
|
|
:topic topic)) |
|
|
|
(async/close! result-chan)) |
|
|
|
(.unsubscribe client topic)))) |
|
|
|
(first (alts!! [result-chan |
|
|
|
(async/timeout (* timeout 1000))]))))) |
|
|
|
|
|
|
|
(defn- parallelism [] |
|
|
|
(-> (Runtime/getRuntime) |
|
|
@ -71,6 +86,11 @@ |
|
|
|
(async/pipeline (parallelism) out xf in) |
|
|
|
out)) |
|
|
|
|
|
|
|
(defn- json-parse-message [msg] |
|
|
|
(-> msg |
|
|
|
(update :payload #(json/read-str % :key-fn keyword)) |
|
|
|
(assoc :timestamp (Instant/now)))) |
|
|
|
|
|
|
|
(defrecord MilquetoastJsonClient [client] |
|
|
|
IMilquetoastClient |
|
|
|
(send-message! [_ topic msg opts] |
|
|
@ -79,7 +99,11 @@ |
|
|
|
(add-channel! [_ chan] (add-channel! client chan)) |
|
|
|
(subscribe-topic! [_ topic opts] |
|
|
|
(pipe (subscribe-topic! client topic opts) |
|
|
|
(map (fn [msg] (update msg :payload json/read-str)))))) |
|
|
|
(map json-parse-message))) |
|
|
|
(get-topic! [_ topic opts] |
|
|
|
(if-let [msg (get-topic! client topic opts)] |
|
|
|
(json-parse-message msg) |
|
|
|
nil))) |
|
|
|
|
|
|
|
(defn send! [client topic msg & {:keys [qos retain] |
|
|
|
:or {qos 1 retain false}}] |
|
|
@ -98,9 +122,10 @@ |
|
|
|
(recur (<! chan)))) |
|
|
|
chan)) |
|
|
|
|
|
|
|
(defn subscribe! [client topic & {:keys [buffer-size] |
|
|
|
:or {buffer-size 1}}] |
|
|
|
(subscribe-topic! client topic {:buffer-size buffer-size})) |
|
|
|
(defn subscribe! [client topic & {:keys [buffer-size qos] |
|
|
|
:or {buffer-size 1 |
|
|
|
qos 1}}] |
|
|
|
(subscribe-topic! client topic {:buffer-size buffer-size :qos qos})) |
|
|
|
|
|
|
|
(defn connect! [& {:keys [host port scheme username password verbose] |
|
|
|
:or {verbose false |
|
|
|