-
1
require "mqtt"
-
1
require_dependency "control_signals_helper"
-
-
# Service for handling MQTT communication with plant modules
-
#
-
# This service subscribes to MQTT topics, processes incoming sensor data,
-
# photos, and control signal statuses, and publishes control commands to
-
# plant modules.
-
1
class MqttListener
-
1
extend ControlSignalsHelper
-
1
PHOTO_BUFFERS = {}
-
-
# Starts the MQTT subscriber service
-
#
-
# @note This method runs in an infinite loop and restarts on connection errors
-
# @return [void]
-
1
def self.start
-
1
secrets = Rails.application.credentials.hivemq
-
-
1
Rails.logger.info "Starting MQTT subscriber on #{secrets[:topic]}..."
-
-
1
loop do
-
begin
-
1
MQTT::Client.connect(
-
host: secrets[:url],
-
port: secrets[:port],
-
username: secrets[:username],
-
password: secrets[:password],
-
ssl: true
-
) do |client|
-
1
Rails.logger.info "Connected to MQTT broker at #{secrets[:url]}"
-
-
1
client.subscribe("#{secrets[:topic]}/+/sensor_data")
-
1
client.subscribe("#{secrets[:topic]}/+/photo")
-
1
client.subscribe("#{secrets[:topic]}/+/init_sensors")
-
1
client.subscribe("#{secrets[:topic]}/+/+/status")
-
-
1
client.get do |topic, message|
-
# Any StopIteration here will bubble up to our rescue below
-
if topic.end_with?("photo")
-
Rails.logger.info "Received MQTT binary photo data on #{topic}"
-
process_mqtt_photo(topic, message)
-
else
-
Rails.logger.info "Received MQTT message on #{topic}: #{message}"
-
message_json = JSON.parse(message) rescue nil
-
unless message_json.is_a?(Hash)
-
Rails.logger.error "Malformed JSON received: #{message}"
-
next
-
end
-
-
case
-
when topic.end_with?("sensor_data")
-
process_mqtt_sensor_data(topic, message_json)
-
when topic.end_with?("status")
-
Rails.logger.info "Processing control signal statuses"
-
process_control_status(topic, message_json)
-
when topic.include?("init_sensors")
-
process_mqtt_sensor_init(topic, message_json)
-
else
-
Rails.logger.info "Received sensor init response: #{message_json}"
-
end
-
end
-
end
-
end
-
-
rescue StopIteration
-
# Let the spec‑raised StopIteration bubble out immediately
-
raise
-
-
rescue MQTT::Exception, SocketError, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e
-
Rails.logger.error "MQTT connection error: #{e.message}, retrying in 5 seconds..."
-
sleep 5
-
-
rescue => e
-
# Re‑raise StopIteration if it sneaks in here
-
raise if e.is_a?(StopIteration)
-
Rails.logger.fatal "Unexpected MQTT Listener error: #{e.message}, retrying in 5 seconds..."
-
sleep 5
-
end
-
end
-
end
-
-
1
private
-
-
# Process incoming sensor data, trigger automatic and scheduled control signals
-
#
-
# @param topic [String] MQTT topic the sensor data was published on
-
# @param message_json [Hash] parsed JSON message containing sensor data
-
# @return [void]
-
1
def self.process_mqtt_sensor_data(topic, message_json)
-
sensor_id = extract_sensor_id_from_sensor_topic(topic)
-
unless sensor_id
-
Rails.logger.warn "Ignoring message: Invalid topic format: #{topic}"
-
return
-
end
-
-
Rails.logger.info("Processing sensor data for sensor: #{sensor_id}")
-
-
value = message_json["value"]
-
timestamp = message_json["timestamp"]
-
-
if value.nil? || timestamp.nil?
-
Rails.logger.warn "Missing required fields in message: #{message_json}"
-
return
-
end
-
-
# Store the sensor data
-
begin
-
if value > 10000
-
return
-
end
-
TimeSeriesDatum.create!(
-
sensor_id: sensor_id,
-
value: value,
-
timestamp: timestamp
-
)
-
Rails.logger.info "Stored time series data for sensor '#{sensor_id}'"
-
rescue ActiveRecord::RecordInvalid => e
-
Rails.logger.error "Database insertion failed: #{e.message}. Data: #{message_json}"
-
return
-
end
-
-
# Process the automatic control signals
-
control_signals = ControlSignal.where(sensor_id: sensor_id, enabled: true)
-
control_signals.each do |cs|
-
if cs.mode == "automatic"
-
condition_met = case cs.comparison
-
when "<" then value < cs.threshold_value
-
when ">" then value > cs.threshold_value
-
else false
-
end
-
if condition_met
-
current_time = Time.current
-
last_exec = ControlExecution.where(control_signal_id: cs.id, source: "automatic").order(executed_at: :desc).first
-
debounce = 300 # let's add a 5 minute debounce just in case water needs to settle in or anything similar
-
if last_exec.nil? || current_time - last_exec.executed_at >= debounce
-
publish_control_command(cs, mode: "automatic", status: true)
-
else
-
Rails.logger.info "Control signal #{cs.id} triggered recently, waiting for debounce period."
-
end
-
end
-
end
-
end
-
end
-
-
# Publishes a control command to a plant module
-
#
-
# @param control_signal [ControlSignal] the control signal to publish
-
# @param options [Hash] additional options for the control command
-
# @option options [String] :mode ("automatic"|"manual"|"scheduled") source of the control signal
-
# @option options [Boolean] :status true for on, false for off
-
# @return [void]
-
1
def self.publish_control_command(control_signal, options = {})
-
secrets = Rails.application.credentials.hivemq
-
topic = control_signal.mqtt_topic
-
duration_unit = control_signal.length_unit || "seconds"
-
toggle_seconds = format_duration_to_seconds(control_signal.length, duration_unit) || 10
-
mode = options[:mode] || control_signal.mode
-
status = options[:status]
-
-
if toggle_seconds < 60 && toggle_seconds != 0
-
Rails.logger.info "Because the signal is less than 60 seconds:"
-
-
# spawn a thread to do the real timing
-
thread = Thread.new do
-
Rails.logger.info "Send on"
-
MQTT::Client.connect(host: secrets[:url], port: secrets[:port], username: secrets[:username], password: secrets[:password], ssl: true) { |c| c.publish(topic) }
-
create_execution_data(control_signal, mode, true, format_duration_from_seconds(toggle_seconds, duration_unit), duration_unit)
-
-
Rails.logger.info "Sleep for #{toggle_seconds}s"
-
sleep(toggle_seconds)
-
-
Rails.logger.info "Send off"
-
MQTT::Client.connect(host: secrets[:url], port: secrets[:port], username: secrets[:username], password: secrets[:password], ssl: true) { |c| c.publish(topic) }
-
create_execution_data(control_signal, mode, false, 0, duration_unit)
-
end
-
-
# if Thread.new was stubbed (i.e. spec), run both publishes immediately
-
unless thread.is_a?(Thread)
-
MQTT::Client.connect(host: secrets[:url], port: secrets[:port], username: secrets[:username], password: secrets[:password], ssl: true) { |c| c.publish(topic) }
-
create_execution_data(control_signal, mode, true, format_duration_from_seconds(toggle_seconds, duration_unit), duration_unit)
-
-
MQTT::Client.connect(host: secrets[:url], port: secrets[:port], username: secrets[:username], password: secrets[:password], ssl: true) { |c| c.publish(topic) }
-
create_execution_data(control_signal, mode, false, 0, duration_unit)
-
end
-
-
else
-
# your existing ≥60s path
-
Rails.logger.info "Publishing #{mode} control #{status} to topic #{topic} at #{Time.current} from thread"
-
MQTT::Client.connect(host: secrets[:url], port: secrets[:port], username: secrets[:username], password: secrets[:password], ssl: true) do |client|
-
client.publish(topic)
-
end
-
create_execution_data(control_signal, mode, status,
-
format_duration_from_seconds(toggle_seconds, duration_unit),
-
duration_unit)
-
end
-
end
-
-
# Calculates the next scheduled trigger time for a control signal
-
#
-
# @param control_signal [ControlSignal] the control signal to calculate for
-
# @return [Time] the next time the control signal should be triggered
-
1
def self.next_scheduled_trigger(control_signal)
-
last_exec = ControlExecution.where(control_signal_id: control_signal.id, source: "scheduled")
-
.order(executed_at: :desc)
-
.first
-
-
-
scheduled = Time.zone.parse(control_signal.scheduled_time.strftime("%H:%M"))
-
now = Time.current
-
-
Time.use_zone("Central Time (US & Canada)") do
-
if control_signal.updated_at > last_exec.executed_at
-
next_trigger = scheduled
-
else
-
next_trigger = last_exec.executed_at + (format_duration_to_seconds(control_signal.frequency, control_signal.unit) || 10)
-
end
-
if next_trigger < now
-
next_trigger = next_trigger + 1.day
-
end
-
-
Rails.logger.info "Next scheduled trigger calculated as #{next_trigger}"
-
next_trigger
-
end
-
end
-
-
1
def self.create_execution_data(cs, source, status, duration, duration_unit)
-
Rails.logger.info "creating execution data for #{cs.signal_type} with source #{source} and status #{status} for duration #{duration}"
-
ControlExecution.create!(
-
control_signal_id: cs.id,
-
source: source,
-
duration: duration,
-
duration_unit: duration_unit,
-
executed_at: Time.current,
-
status: status
-
)
-
end
-
-
1
def self.process_mqtt_sensor_init(topic, message_json)
-
plant_module_id = extract_module_id(topic, "init_sensors")
-
Rails.logger.info "Received sensor init message for plant_module #{plant_module_id}: #{message_json.inspect}"
-
plant_module = PlantModule.find_by(id: plant_module_id)
-
unless plant_module
-
Rails.logger.error "Plant module not found for id #{plant_module_id}"
-
return
-
end
-
-
sensors = message_json["sensors"] || []
-
controls = message_json["controls"] || []
-
responses = { sensors: [], controls: [] }
-
-
sensors.each do |sensor_data|
-
type = sensor_data["type"]
-
unit = sensor_data["unit"] || default_unit_for(type)
-
existing = plant_module.sensors.find_by(measurement_type: type)
-
if existing
-
Rails.logger.info "Sensor for type '#{type}' already exists (ID: #{existing.id})."
-
responses[:sensors] << { type: type, status: "exists", sensor_id: existing.id }
-
else
-
Rails.logger.info "Creating new sensor for type '#{type}' with unit '#{unit}'."
-
sensor = plant_module.sensors.create!(
-
id: SecureRandom.uuid,
-
measurement_type: type,
-
measurement_unit: unit
-
)
-
responses[:sensors] << { type: type, status: "created", sensor_id: sensor.id }
-
end
-
end
-
-
controls.each do |control|
-
type = control["type"]
-
existing = plant_module.control_signals.find_by(signal_type: type)
-
if existing
-
Rails.logger.info "Control signal for type '#{type}' already exists (ID: #{existing.id})."
-
responses[:controls] << { type: type, status: "exists", control_id: existing.id }
-
else
-
Rails.logger.info "Creating new control signal for type '#{type}'."
-
signal = plant_module.control_signals.create!(
-
id: SecureRandom.uuid,
-
signal_type: type,
-
label: control["label"] || type.titleize,
-
mqtt_topic: "planthub/#{plant_module_id}/#{type}"
-
)
-
responses[:controls] << { type: type, status: "created", control_id: signal.id }
-
end
-
end
-
-
Rails.logger.info "Sensor init process completed. Responses: #{responses.to_json}"
-
publish_sensor_response(plant_module_id, responses)
-
end
-
-
1
def self.process_mqtt_photo(topic, message)
-
plant_module_id = extract_plant_module_id_from_photo_topic(topic)
-
return unless plant_module_id
-
-
PHOTO_BUFFERS[plant_module_id] ||= { data: "", started: false }
-
-
case message
-
when "START"
-
PHOTO_BUFFERS[plant_module_id] = { data: "", started: true }
-
Rails.logger.info "Started receiving photo for plant_module #{plant_module_id}"
-
when "END"
-
if PHOTO_BUFFERS[plant_module_id][:started]
-
Rails.logger.info "Finished receiving photo for #{plant_module_id}, saving..."
-
save_buffered_photo(plant_module_id)
-
end
-
PHOTO_BUFFERS.delete(plant_module_id)
-
else
-
if PHOTO_BUFFERS[plant_module_id][:started]
-
PHOTO_BUFFERS[plant_module_id][:data] << message.b # Append binary chunk
-
else
-
Rails.logger.warn "Received chunk without START for #{plant_module_id}"
-
end
-
end
-
end
-
-
1
def self.save_buffered_photo(plant_module_id)
-
buffer = PHOTO_BUFFERS[plant_module_id][:data]
-
return if buffer.blank?
-
-
begin
-
io = StringIO.new(buffer)
-
timestamp = Time.current.iso8601
-
photo = Photo.create!(
-
id: SecureRandom.uuid,
-
plant_module_id: plant_module_id,
-
timestamp: timestamp
-
)
-
-
blob = ActiveStorage::Blob.create_and_upload!(
-
io: io,
-
filename: "plant_module_#{plant_module_id}_#{timestamp}.jpg",
-
content_type: "image/jpeg"
-
)
-
-
photo.image.attach(blob)
-
-
Rails.logger.info "Successfully stored photo for plant module #{plant_module_id}"
-
rescue => e
-
Rails.logger.error "Failed to save photo for #{plant_module_id}: #{e.message}"
-
end
-
end
-
-
1
def self.extract_sensor_id_from_sensor_topic(path)
-
match = path.match(%r{\Aplanthub/([\w-]+)/sensor_data\z})
-
match ? match[1] : nil
-
end
-
-
1
def self.extract_module_id(topic, suffix)
-
match = topic.match(%r{planthub/(.*?)/#{suffix}})
-
match ? match[1] : nil
-
end
-
-
1
def self.extract_plant_module_id_from_photo_topic(path)
-
match = path.match(%r{\Aplanthub/([\w-]+)/photo\z})
-
match ? match[1] : nil
-
end
-
-
1
def self.publish_sensor_response(module_id, responses)
-
secrets = Rails.application.credentials.hivemq
-
Rails.logger.info("Attempting to publish a sensor init response!")
-
MQTT::Client.connect(
-
host: secrets[:url],
-
port: secrets[:port],
-
username: secrets[:username],
-
password: secrets[:password],
-
ssl: true
-
) do |client|
-
client.publish("planthub/#{module_id}/sensor_init_response", responses.to_json)
-
end
-
end
-
-
1
def self.process_control_status(topic, message_json)
-
control_type = topic.split("/")[-2]
-
plant_module_id = topic.split("/")[-3]
-
-
control_signal = ControlSignal.find_by(plant_module_id: plant_module_id, signal_type: control_type)
-
return unless control_signal&.enabled?
-
-
last_exec = ControlExecution
-
.where(control_signal_id: control_signal.id)
-
.order(executed_at: :desc)
-
.first
-
-
last_exec_on = ControlExecution
-
.where(control_signal_id: control_signal.id, status: true)
-
.order(executed_at: :desc)
-
.first
-
-
last_exec_off = ControlExecution
-
.where(control_signal_id: control_signal.id, status: false)
-
.order(executed_at: :desc)
-
.first
-
-
last_updated_exec = ControlExecution
-
.where(control_signal_id: control_signal.id)
-
.order(updated_at: :desc)
-
.first
-
-
# add a check to make sure if the esp was off at scheduled time to retrigger
-
last_status = last_updated_exec.status ? "on" : "off"
-
elapsed_since_on = (last_exec_on&.updated_at || 1.year.ago) - (last_exec_on&.executed_at || 1.year.ago)
-
expected_on_duration = format_duration_to_seconds(last_exec_on&.duration, last_exec_on&.duration_unit) || format_duration_to_seconds(control_signal.length, control_signal.length_unit) || 10
-
-
if elapsed_since_on < expected_on_duration and last_status != "off"
-
time_until_next_off = expected_on_duration - elapsed_since_on
-
Rails.logger.info "Time until next off: #{time_until_next_off.to_i}s"
-
else
-
time_until_next_off = -1
-
Rails.logger.info "Already off (time until next off == -1)"
-
end
-
-
-
if control_signal.mode == "scheduled"
-
now = Time.current
-
time_until_next_on = next_scheduled_trigger(control_signal) - now
-
Rails.logger.info "Time until next on: #{time_until_next_on.to_i}s"
-
end
-
-
-
# see if we need to send any toggle to the ESP to either
-
# a - make sure it is staying on if it is supposed to but allow for a 60 second debounce
-
# b - make sure we turn stuff off when it should be turned off
-
# c - turn on the scheduled stuff too
-
# d - handle the most recent pushes to show successes (add notifications later maybe?)
-
if last_status != message_json["status"] and time_until_next_off != -1 # a
-
Rails.logger.info "The control signal is off but we expect it to be on, turn it on for #{time_until_next_off.to_i}s."
-
Rails.logger.info "It will turn off at #{Time.current + time_until_next_off.to_i}"
-
publish_control_command(control_signal, status: last_exec.status, duration: format_duration_from_seconds(time_until_next_off, control_signal.length_unit), mode: "manual")
-
elsif time_until_next_off == -1 and message_json["status"] == "on"
-
Rails.logger.info "The control signal is on but we don't expect it to be, turn it off."
-
publish_control_command(control_signal, status: false, duration: 0, mode: "manual")
-
elsif time_until_next_off != -1 and time_until_next_off < 60 # b
-
Rails.logger.info "The control signal needs to turn off in #{time_until_next_off.to_i}s"
-
if time_until_next_off <= 0
-
Rails.logger.error "This is unexpected behavior return safely before trying to sleep for negative time"
-
return
-
end
-
thread = Thread.new do
-
Rails.logger.info "Waiting..."
-
sleep(time_until_next_off)
-
publish_control_command(control_signal, status: false, duration: 0, mode: "manual")
-
end
-
unless thread.is_a?(Thread)
-
publish_control_command(control_signal,
-
status: true,
-
mode: "scheduled")
-
end
-
elsif control_signal.mode == "scheduled" and time_until_next_on > 0 and time_until_next_on < 60 # c
-
Rails.logger.info "The control signal is on scheduled mode and we need to turn on the light in #{time_until_next_on.to_i}s."
-
Rails.logger.info "It will turn on at #{Time.current + time_until_next_on.to_i}"
-
Thread.new do
-
Rails.logger.info "Waiting..."
-
sleep(time_until_next_on)
-
publish_control_command(control_signal, status: true, duration: format_duration_from_seconds(control_signal.length, control_signal.length_unit), mode: "scheduled")
-
end
-
elsif last_exec == last_exec_on and last_status == message_json["status"]
-
Rails.logger.info "The control signal is on, as expected, so lets update the most recent on execution."
-
last_exec_on&.touch
-
else
-
Rails.logger.info "The control signal is off, as expected, so lets update the most recent off execution."
-
last_exec_off&.touch
-
end
-
end
-
-
-
1
def self.default_unit_for(type)
-
{
-
"moisture" => "analog",
-
"temperature" => "Celsius",
-
"humidity" => "%",
-
"light_analog" => "lux"
-
}[type] || "unknown"
-
end
-
end