IoT - Using a working queue for my Domotica apps
Author: Jan Willem Teunisse, 26 March 2020 (edited)
Introduction
This article describes the queuing system of my domotica system, what it is, how to use it, and the benefits of using a queue in a domotica
architecture.
A definition of a queue is:
A queue is a line of things like actions or messages waiting to be handled, starting at the beginning of the line and processing it in a sequential order.
For instance a message queue is a queue of messages sent between applications. It includes a sequence of work objects that are waiting to be processed.
A message is the data transported between the sender and the receiver application; it's essentially a byte array with some headers at the top.
An example of a message could be something that tells one system to start processing a task, it could contain information about a finished task or
just be a plain message.
The basic architecture of a message or worker queue is simple; there are client applications called senders that create action messages and deliver
them to the worker queue. Another application or subroutine, called a worker, connects to the queue and gets the action messages to be processed.
Action messages placed onto the queue are stored until the worker retrieves them and processes them as soon as possible or at a certain time.
Examples of queues are in a webserver that serves client requests, and one or more workers that performs resource-intensive tasks or batch jobs.
Or for instance an e-mail system. A queueing system is based on asynchronous communications. And it works on First-In, First-Out (FiFo) basis.
Other special forms are on a priority or a given date-time basis (type scheduler).
In case of my domotica system I use two type of worker queues, 1) in memory using an array Queue and 2) a queue table in a sqlite3 database file.
1) in memory: my ZWave webserver: the http microservices commands to switch on/of a light or wall plug switch are placed in queue array, until
the Zwave USB stick is free to send the command into the ZWave mesh network;
2) the sqlite3 queue table is used to store the action messages until its time for the worker to process the requested action on the requested date-time.
My domotica system is based on a couple of ZWave en Plugwise (Zigbee) components and a few Raspberry Pi's (RPI) in order to monitor my 'smart' Dutch energy meter, to gather several sensor data for temperature/humidity and to control an opentherm central heating system by means of a OpenTherm GateWay(OTGW) and a RPI sunscreen controller.
In my small IoT project the domotica tools are running on a Windows 10 Shuttle micro desktop. Also Nodejs and the Javascript ZWave and RF (433Mc) handlers are implemented on this server. The Plugwise Zigbee mesh network is controlled by a separate Stretch controller (a closed Linux box). Data is stored in an Oracle XE database and a Sqlite3 database file for the queue table. The Sqlite3 queue database is a subject in this article.
I could have used one of those available open source Domotica or commercial systems, but as a retired electonics & software engineer I prefered to develop my domotica system by myself based on open source tools.
Reference material for further reading:
- Principle of a Circular Buffer is described in wikipedia;
- Information on a queue in javascript, start for instance with this nice article overview. ;
- This article describes the web-queue worker principles;
Structure of my working queues
Based on an array Queue
I started with an implementation of the Javascript queue as described in reference [2] for my ZWave handler webserver.
The ZWave handler webserver contains two parts, 1) receiving and sending ZWave messages into the ZWave mesh network.
It handles the received messages from temperature/humidity, windows/doors & PIR sensors and feedback messages (status or energy consumption data)
of switches like lights and wallplugs.
From the queue array it sends command messages in order to switch on/off lights, get battery status, etc.
And 2) the web based microservices like http requests to switch on/off a light or wallplug. These requests (sender part) are queued in the queue array.
The command messages (cmdMsg) are stored in a 2-dimensional array, the first part contains the message type (msgType) string,
the second part is the Zwave command: an array of bytes.
The function SQAdd(msgType, cmdMsg) adds the command to the end of the array and returns the index of the array where the data is stored.
The function SQFrontOut() returns the first command in the row and delete this entry (First-In, First-Out)
Supported functions are:
- isSQEmpty()
- isSQFilled()
- SQPeekFirst() returns the value [msgType, cmdMsg]
- SQSize() returns the number of elements in the array
The javascript source code is as follows:
My prototype in Nodejs Javascript
// testqueue.js FIFO principe, on the basis off a 2-dimensional array
// array SQ = [msgType, cmdMsg] like ['setswitch', buffer cmdMsg]
// functions:
// isSQEmpty() - boolean if the array is empty
// isSQFilled() - boolean if the array has messages
// SQAdd(msgType, buffer cmdMsg) - returns the index of the array
// SQPeekFirst() - returns value [msgType, buffer cmdMsg], without deleting the row
// SQFrontOut() - returns value [msgType, buffer cmdmsg] and delete the row in the SQ array
// SQSize() - returns the size or number of the array elements
var SQ = [ []] ; // array SQ = SendQueue: [msgType, cmdMsg] like ['setswitch', buffer cmdMsg]
var SQRow = [] ;
var cmdMsg = [] ;
var msgType = 'empty' ;
var SQidx = 0 ;
var SQsize = 0 ;
var Run_SQ = true ; // indien false, stop sending messages from the SQ queue
function isSQEmpty() {
let lResult = false ;
if (SQ.length == 0) {
lResult = true ;
}
return lResult ;
}
function isSQFilled() {
let lResult = false ;
if (SQ.length > 0) {
lResult = true ;
}
return lResult ;
}
function SQAdd(pMsg, pBfr) { // add new cmdMsg at the end of the array
let lIdx = 0;
SQ.push([pMsg,pBfr]) ;
lIdx = SQ.length - 1 ;
return lIdx;
}
function SQPeekFirst() { // retrieve the first cdmMsg
return SQ[0] ;
}
function SQFrontOut() { // retrieve the first cdmMsg and delete this row
return SQ.shift() ;
}
function SQSize() { // returns the number of cmdMgs's
return SQ.length ;
}
var nodeID = "" ;
var nodeName = "";
console.log('test queue functions') ;
SQRow = SQFrontOut() ; // reading and empty queue
SQidx = SQAdd('setswitch', [1, 2, 255, 0, 5]) ; // cmdMsg have different sizes
SQidx = SQAdd('getswitch', [2, 3, 254, 0, 5, 6]) ;
SQidx = SQAdd('getbattery', [3, 3, 255, 0, 5, 6, 7]) ;
console.log('size is ', SQSize() );
console.log('Queue is ', SQ) ;
SQRow = SQPeekFirst() ;
console.log('SQPeek ',SQPeekFirst() ) ;
while (Run_SQ) {
if (isSQFilled() ) {
console.log(' SQFront: ', SQFrontOut() ) ;
console.log(' size is: ', SQSize() );
console.log(' Boolean Empty: ', isSQEmpty() );
console.log(' Boolean Filled: ', isSQFilled() );
} else {
Run_SQ = false
}
}
process.on( 'SIGINT', function () {
console.log("SIGINT: Shutting down the test.");
Run_SQ = false ;
setTimeout(function() {
console.log('Exiting..');
process.exit(process.exitCode);
}, 100);
});
The principle is also tested using the Go language (Golang).
According to some Golang articles on this subject it is not recommended to use arrays due to
memory leaks and the used memory is not given back to the system.
For more information see this web publication.
Another interesting article is this queue datastructure.
Based on this article and the one about 'genny - Generics for Go' I implemented a
similar working queue. For details see the following source code.
The source code is like this:
My queue prototype in Go
package main
// testqueue.go FIFO principle, on the basis off a 2-dimensional 'array'
// array = [msgType, cmdMsg] like ['setswitch', buffer cmdMsg]
// functions:
// IsQueueEmpty() - boolean true if queue is empty
// IsQueueFilled() - boolean true if queue is filled
// QueRows.AddQueue(item SQRow) - add a row at the end of the queue
// to-do QueuePeekFirst() - returns value [msgType, buffer cmdMsg] of first item in the queue
// QueRows.FrontOut() - returns the value [msgType, buffer cmdmsg] and delete the row in the queue
// QueueSize() - returns the size or number of items in the queue
import (
"fmt"
)
type SQRow struct { // SQRow is a row in type QueRows
MsgType string
CmdMsg []byte
}
type QueRows struct { // QueRows is a queue of multiple SQRow's.
items []SQRow
}
var msgType string
var Run_SQ bool // indien false, stop sending messages from the queue
// function NewQueue() creates a new instance of type QueRows
func NewQueue() *QueRows {
return &QueRows{items: make([]SQRow, 0)}
}
// method q.AddQueue adds a new item at the end of the queue (also called EnQueue)
func (q *QueRows) AddQueue(item SQRow) {
q.items = append(q.items, item)
}
// method q.FrontOut retrieves the first item from the queue and deletes this item from the queue (also called DeQueue)
func (q *QueRows) FrontOut() SQRow {
item := q.items[0]
q.items = q.items[1:]
return item
}
// function IsQueueEmpty returns true if the queue is empty
func IsQueueEmpty(pQ *QueRows) bool {
lResult := false
if (len(pQ.items) == 0) {
lResult = true
}
return lResult
}
// function IsQueueFilled returns true if the queue has rows
func IsQueueFilled(pQ *QueRows) bool {
lResult := false
if (len(pQ.items) > 0) {
lResult = true
}
return lResult
}
// function QueueSize returns how many queue rows are in the queue
func QueueSize(pQ *QueRows) int {
return len(pQ.items)
}
func main() {
var sqrow SQRow
fmt.Println("test queue functions")
msgType = "setswitch"
sqrow.MsgType = msgType
sqrow.CmdMsg = []byte{0x01, 0x08, 0x00, 0x04, 0x00, 0xef, 0x02, 0x27, 0x04, 0x39}
zwq := NewQueue() // initiates a ZWave command message queue
zwq.AddQueue(sqrow)
sqrow.MsgType = "getswitchstate"
sqrow.CmdMsg = []byte{0x01, 0x09, 0x00, 0x13, 0x00, 0x02, 0x25, 0x02, 0x25, 0x00}
zwq.AddQueue(sqrow)
sqrow.MsgType = "getnodeinfo"
sqrow.CmdMsg = []byte{0x01, 0x04, 0x00, 0x60, 0x00, 0x00}
zwq.AddQueue(sqrow)
fmt.Println("Queue: ", zwq)
fmt.Println("Size is ", QueueSize(zwq))
Run_SQ = true
for (Run_SQ) {
if IsQueueFilled(zwq) {
sqrow = zwq.FrontOut()
fmt.Println(" Queue Front: ", sqrow.MsgType, ", cmd: ", sqrow.CmdMsg )
fmt.Println(" Size is: ", QueueSize(zwq) )
} else {
Run_SQ = false
}
}
fmt.Println("End of test queue functions")
}
This previous example is not concurrency save. Therefore we need to import the "sync" inner library. And use a lock and unlock mechanism in the methods like this. Otherwise we could get panic errors on runtime.
package main
import (
"fmt"
"sync"
)
type SQRow struct { // SQRow is a row in type QueRows
MsgType string
CmdMsg []byte
}
type QueRows struct { // QueRows is a queue of multiple SQRow's.
items []SQRow
lock sync.RWMutex
}
var msgType string
var Run_SQ bool // indien false, stop sending messages from the queue
// function NewQueue() creates a new instance of type QueRows
func NewQueue() *QueRows {
return &QueRows{items: make([]SQRow, 0)}
}
// method q.AddQueue adds a new item at the end of the queue (also called EnQueue)
func (q *QueRows) AddQueue(item SQRow) {
q.lock.Lock()
q.items = append(q.items, item)
q.lock.Unlock()
}
// method q.FrontOut retrieves the first item from the queue and deletes this item from the queue (also called DeQueue)
func (q *QueRows) FrontOut() SQRow {
q.lock.Lock()
item := q.items[0]
q.items = q.items[1:]
q.lock.Unlock()
return item
}
.....
Based on a worker Queue database table
Senders like my autodomus_services, zwwebsrvr, rflinkhandler, etc. will be putting action messages into a worker queue table, and a couple
of seperate worker apps retrieve their action message to execute these a.s.a.p. or at the specified date-time.
In order to test this I have chosen for a SQLite3 database table.
The database file is called autodomus.db and the table name is queue.
And in this case the queue table is based on the principles of circular buffers [1].
I do not delete the First-Out row but control the FiFo mechanism with the status column in the table.
See the following picture for the various states of an action message.
The columns of the queue table are:
rowid | provided by sqlite3 |
id | INTEGER PRIMARY KEY UNIQUE NOT NULL, |
seqnr | INTEGER, NOT NULL, |
datetime | VARCHAR (14) NOT NULL, |
sender | VARCHAR (10), |
status | CHAR (1) NOT NULL DEFAULT N, |
repeat | CHAR (3) DEFAULT ('#'), |
counter | INTEGER DEFAULT (1), value 0 == continuous process |
system | VARCHAR (5) NOT NULL, |
worker | VARCHAR (10) NOT NULL, |
lastupdated | TEXT (14) NOT NULL DEFAULT (0), |
action | VARCHAR (100) NOT NULL, |
args | VARCHAR (200) DEFAULT ('*'), '*' means no arguments |
The sender is responsible for the values of the added table row with exception of columns rowid and id.
The different states of the queue row is defined by the status column.
These states can be in the following order (see alo the picture):
- N (New) | a new action message, waiting to be processed by the worker |
- P (Processing) | the worker is busy processing the action message |
- F (Finished) | the worker has succesfully processed the action message |
- A (Available) | the table row is available to be used again by a sender => next status := 'N' |
- S (Stopped) | the action message is (temporary) stopped by a controller (an app or a human) |
- R (Reserved) | or value '-' |
The function addNewQueueRow checks first which row with the lowest id has the status Available. If such a row exists, then a SQL Update is used to update the row with the new action message data. In case there is no row available, a SQL Insert inserts the action data at the end of the table. The function returns the rowid to the calling programm.
It is possible to store repeating action messages in the queue table. By means of the fields repeat and counter these repeating actions
are defined. The options are:
D Daily
W Weekly
M Monthly
Y Yearly
h hourly
m minutes (Seconds are not implemented)
The repeat letter is followed by a digit. The format is for instance D02, meaning today plus 2 days.
The field counter contains the number of times the given action has to be executed.
For instance the counter is the value of 10: the action is executed 10 times. The counter value of zero (0) defines that the action is repeated
continously according to the repeat value.
A repeat value of 'M01' means in this case, that the action is repeated each month.
The column system determines which subsystem deals with the action messages, like 'ZW' stands for the ZWave zwwebsrvr.js,
'PW' for the Plugwise Stretch system, 'RF' for RFLinkHandler, 'RUN' for a Windows of Linux process/programm.
The column worker defines which worker programm or app executes the action message.
The field action defines which action has to be executed. The format is identical to the HTTP GET request I use.
Some examples:
ZW nodeid=6&set=onSwitch on the Porch lights
PW nodeid=bll&set=onSwitch on my bedside table light
RUN program_name and [args]Starts a program or app in a subprocess
In the implementation I use a shortcode for the full names of the lights, like (in Dutch)
- bll or bed...= bedleeslamp (bedside table lamp)
- llw or leesl... = leeslamp woonkamer (living room)
- slw or schemer... = schemerlamp woonkamer (living room)
By means of a webform new worker actions can be added into the queue table. The relevant data is posted to a webserver adwebsrvr.go by means of a HTTP POST request. The webserver is developed in the Go (Golang) programming language. A new version of my prototype Winbatch adworker.wbt is also written in Go. For more details see my article about this expirement, to be published coming month.
Summary
The main goal in this article was to focus on the design and to use a simple working queue system(s) in my domotica system.
Licenses and copyright
Licenses of the used software components.
- Golang:
Go code is licensed under a BSD license.
- Nodejs:
For information on the Nodejs license see for instance their website
.
© Copyright 2020 by J.W. Teunisse
This piece of software as presented in this article is provided 'as-is', without any express or implied warranty. In no event will the author be held liable for any damages arising from the use of this article or software (code).
Comments or advice
Your comments or advice for improvement are most welcome, you can send them to the following email-address
pr@jwteunisse.nl