My dad lives alone in an apartment all by himself. He was born in 1925 and now a little wobbly on his feet. We already lost my mom a few years ago, after she wondered around (advanced stage of Alzheimer's) during the night and fell en broke her hip while in a nursing home. Recently, a lady next door to him was found after 14 hours while lying on the floor. She fell and could not get to the alarm button or could not operate it. My dad has one of those buttons too, en he claims to have it on him all the time, certainly after the mishap to his neighbor. A number of years ago, an aunt from my wife fell badly in her home, and broke her arm and ankle. She could not get to the telephone, and was only found after almost two days. Somebody I know from our tennis club had a hart attack while sitting in his chair, also living alone, and it took 4 days before they found out. I want to spare my dad (and us) the same ordeal.
A little while ago, I stumbled on a website from a guy that is an expert on Computer Vision. He publishes many applications that use a camera of software to do recognition and tracking of objects. This peaked my interest. I had a RPi camera already in my stash for at least two years, with the intend to fabricate a burglar alarm. The applications I found were using Motion, but I didn't think much of it, so the camera was left in the box.
Here is the website where I found many interesting applications where I learned a lot from : https://www.pyimagesearch.com
In the process of learning I downloaded the code and tried many posts. After a while I zoomed in on a post that counted people.
https://www.pyimagesearch.com/2018/08/13/opencv-people-counter/
It looked the right thing for me. The reason I picked this particular application is as follows.
I want to measure movement, not presence. Image the situation where my dad sits in a chair, or falls and lies on the ground. Knowing that he's there does not help him, nor me. I want to know that he is moving around in the apartment.
So tracking him crossing an imaginary line seemed to be the right approach. I changed to code from the application to have a vertical line he needs to cross, and I tried several iterations to get it to work. The problem I ran into was speed. I'm using an RPi Model3 B+ and that is simply not fast enough to reliably recognize my dad as a "person" and track him. When the RPi Model 4 came out, I tried that and sadly, even that is too slow for this appllication as well.
I tried many things to speed things up, but the RPi was already running flat out on all cylinders (4 cores). Just when I was about to shelve the project, I found another post on the website that rejuvenated my interest. This post was about the recently launched device from Google, the Coral TPU USB accelerator.
Here is that post : https://www.pyimagesearch.com/2019/04/22/getting-started-with-google-corals-tpu-usb-accelerator/
I read all I could find about this device on the internet, which is not much at the moment. There was a very impressive Youtube video where Google folks showed what this device could do, and so I bit the bullet and ordered one.
It arrived after a few days, and I quickly installed the driver software and tried the demo programs, they all worked flawlessly, so my initial doubts where quickly diminishing.
I then started a new app using this device, while retaining some of the structure and elements I used before. I was amazed at the speed. There is no longer a need to do recognition every 40 frames and use the cpu lighter tracking method in between. The Coral runs so fast that I can use recognition on every frame and have a very smooth display of movement. I use htops version 2.2 to track the activity on the cores, and also track the core temperature and speed. It now runs with less than 40% load and the core temperature never goes much above 60-70 degrees C. When my kit is installed in a cabinet, it may get warmer, so I'm using a temperature driven fan. This is described on this forum as well. By the way, I'm running the Coral at the normal speed, there is no need to go faster.
Needless to say, I am very enthusiastic about this solution.
As you can image, I softly introduced my dad to the idea of using a device that would track his movement. He knows I'm not taking a video and invade his personal space. He was fine with the idea, and I have shown him a few early prototypes to get him used to the idea. After I finished the version with the Coral TPU, I took the kit over to his apartment to do a first tryout there, and see if I could really do what I wanted to do. I also needed to find a nice spot for the RPi and the camera.
Thinks worked very well right from the start, so we were both enthusiastic. I also found a nice place to put the stuff. He has a flat screen TV that is located in a cabinet. On the back is an unused screw hole that is there to mount a wall frame to the TV. I can use that to mount the RPi case and the Coral TPU. I can then use double sided tape to attach the camera enclosure on top of the TV. The location is perfect. The camera is facing an area in his living room where he needs to walk through to get to the kitchen, front door, or bathroom.
I'm currently running full-blown tests in my own "lab", to make sure everything works.
I'll be updating things as I get further into the deployment.
Because the RPi is located in a cabinet, together with some other equimpent, mainly video, it gets a little warmer than I like. I concocted a little board that sits on top of the RPi P1 GPIO connector, and drives the fan, the buzzer, the shutdown button (just for emergencies), the stop_app button and the stop_app LED.
The fan I use is a favourite of mine. I use it on all my RPi's that need it. It's small and very silent. Not cheap though! The model is the Noctua NF-A4x10 5V. It comes with silent mounting parts and a number of cables, that I don't use. The buzzer is one of those small round things that can be had for a very small price.
I did not take the time to find the 3D model for the P1-connector. It is obviously the 20 pin female part. I have a number of them in my stash, so I used that. It fits on all RPi models.
To make sure the RPi does not get hung-up, I use the watchdog. Have a look here for more details :
https://www.raspberrypi.org/forums/viewtopic.php?f=29&t=147501
It's a post that I just recently upgraded with the latest information.
After a few weeks of using, I made a number of adjustments and improvements. In the meantime, I also put the code up on Github. Only the code posted there shows the latest versions.
https://github.com/paulvee/dad-watcher
Have fun!
If you like what you see, please support me by buying me a coffee: https://www.buymeacoffee.com/M9ouLVXBdw
********************************************************************************Here are some explanations of the program and it's features.
Movements are tracked, and when there is no movement for a longer period, I sound a beeper with an increasing loudness to alert my dad that he needs cross the eye, or I will get a message. The reason I'm doing this is to make sure I limit the number of messages, and if something happens to him, he knows that help will be on the way pretty soon.
The message I get will be an SMS to my smartphone. I can then first call him to see what is going on, or alert the local help desk in the apartment building. Next step is for myself to get over there as fast as I can. I can be there in 10-15 minutes. If I can't, I can call my brother or sister and let them know what is going on.
The tiny beeper is connected through a transistor to a GPIO pin and is fed by the 5V supply of the P1 connector.
There is no need to run this program the whole day. I decided to activate the application several times during the day when he is most likely to be home. I use cron to start and stop the application. When the app is running, activities get logged into a file. At the end of every day this logfile is emailed to me. Initially I will collect a lot of data in this file, but can reduce it (set DEBUG to False in the configuration file) when the test phase is over.
I use a configuration file for key parameters that make it easy for me to change them while I'm at his place.
The file is a json formatted structure that allows comments.
The name of the file is conf.json
{
"DEBUG": true, // (true) add more information to the log file or (false)
"DAEMON": false, // if false, systemd is in control, otherwise use the console
"SMS": false, // if true, send SMS
"TEST": true, // if true, beeps at recognition of person
"alarm_time": 60, // time the alarm beeper is running in seconds
"window": 3500, // time movement detection window in seconds (58 minutes)
"confidence": 0.8, // recognition value, the higher the more reliable
"twilio_account_sid": "", // for Twilio
"twilio_auth_token": "", // for Twilio
"from_cell": "", // Twilio cell number
"to_cell": "", // your target cell number
}
DEBUG = True : activate a lot of logging
DAEMON = True : when the program runs normally, activated by cron. There is no screen output. With DEAMON = False, you can see the activity on the console used during testing.
SMS = True : will send out SMS messages, False will turn that off
alarm_time = number of seconds the beeper will sound just before the SMS goes out. Crossing the eye will reset the counter
window = number of seconds in which a movement must be registered, otherwise the alarm phase will start.
confidence = is used for the recognition factor for the model. I want it to be pretty high to avoid stray recognitions.
********************************************************************************
I use systemd and cron to start and stop the dad_watch script and just cron for the send email scripts and app_stop.
Note that I use the root version of cron!
In order to do that, use sudo su and than crontab -e
Here is a list of the cron entries:
0 8 * * * /bin/systemctl start dad_watch.service
5 9 * * * /bin/systemctl stop dad_watch.service
0 11 * * * /bin/systemctl start dad_watch.service
5 12 * * * /bin/systemctl stop dad_watch.service
0 13 * * * /bin/systemctl start dad_watch.service
5 14 * * * /bin/systemctl stop dad_watch.service
0 16 * * * /bin/systemctl start dad_watch.service
5 17 * * * /bin/systemctl stop dad_watch.service
0 20 * * * /bin/systemctl start dad_watch.service
5 21 * * * /bin/systemctl stop dad_watch.service
0 22 * * * /usr/bin/python3 /home/pi/send_mail.py
0 23 * * * /usr/bin/python3 /home/pi/clear_stop_flag.py
********************************************************************************
The following is a script that will allow the user to disable the running of the watch program in case he or she is not in the room
for a longer period. An entry in cron will reset the state late in the evening just in case it is forgotten
#!/usr/bin/python3
#-------------------------------------------------------------------------------
# Name: check_app_stop.py
# Purpose: Check if a pushbutton was pressed to prevent services from running.
# If the button is pressed, a file in the home directory is
# created that systemd will check before starting the application.
#
# Author: paulv
#
# Created: 10-06-2019
# Copyright:
# Licence: <your licence>
#-------------------------------------------------------------------------------
import RPi.GPIO as GPIO
import subprocess
import time
import os
DEBUG = False
GPIO.setmode(GPIO.BCM) # use GPIO numbering
GPIO.setwarnings(False)
file_name= "/home/pi/do_not_run"
BUTTON = 24 # GPIO-24
# pulled-up to create an active low signal
GPIO.setup(BUTTON, GPIO.IN, pull_up_down=GPIO.PUD_UP)
LED = 23 # GPIO 23
GPIO.setup(LED, GPIO.OUT, initial=GPIO.LOW) # active high
def main():
# when the program starts (after a boot), remove the file flag
# if the file is still there
if os.path.isfile(file_name):
if DEBUG : print("@boot: removing file flag")
subprocess.call(["rm {}".format(file_name)], shell=True, \
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
if DEBUG : i = 0
while True:
# set an interrupt on a falling edge and wait for it to happen
GPIO.wait_for_edge(BUTTON, GPIO.FALLING)
# if the interrupt happends, we'll land here
if DEBUG:
i += 1
print ("Button press detected {} {}".format(i, GPIO.input(BUTTON)))
#filter out short presses and glitches
time.sleep(0.5)
if GPIO.input(BUTTON) == 0:
if DEBUG : print("button is really pressed")
if not os.path.isfile(file_name):
if DEBUG : print("Stop application is requested, creating file flag")
subprocess.call(["touch {}".format(file_name)], shell=True, \
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# turn the warning LED on
GPIO.output(LED, GPIO.HIGH)
else:
if DEBUG : print("file is there, remove it")
subprocess.call(["rm {}".format(file_name)], shell=True, \
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# turn the warning LED off
GPIO.output(LED, GPIO.LOW)
except KeyboardInterrupt:
if DEBUG : print("\nCtrl-C - Terminating")
finally:
# remove the file if still there
if os.path.isfile(file_name):
if DEBUG : print("shutdown: removing file flag")
subprocess.call(["rm {}".format(file_name)], shell=True, \
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
GPIO.cleanup()
if __name__ == '__main__':
main()
********************************************************************************
This is the routine called by cron to clear the flag if it was set.
#!/usr/bin/python3
#-------------------------------------------------------------------------------
# Name: clear_stop_flag.py
# Purpose: Clear the manually set file flag that stops the application from
# running.
# This will be executed at cron around 23:00 hrs.
#
# Author: paulv
#
# Created: 10-06-2019
# Copyright:
# Licence: <your licence>
#-------------------------------------------------------------------------------
import subprocess
import os
DEBUG = False
file_name= "/home/pi/do_not_run"
def main():
# remove the file flag if the file is still there
if os.path.isfile(file_name):
if DEBUG : print("removing file flag")
subprocess.call(["rm {}".format(file_name)], shell=True, \
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if __name__ == '__main__':
main()
********************************************************************************
Here is a quick and dirty email program that I use to send me the log file
#!/usr/bin/python3
#-------------------------------------------------------------------------------
# Name: send_mail.py
# Purpose: send an email with a file attachment
#
# Author: paulv
#
# Created: 25-04-2019
# Copyright: (c) paulv 2019
# Licence: <your licence>
#-------------------------------------------------------------------------------
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
def send_mail():
fromaddr = "email address"
toaddr = "your email address"
msg = MIMEMultipart()
msg['From'] = fromaddr
msg['To'] = toaddr
msg['Subject'] = "Dad_watch activity log file"
body = "Today's log file"
filename = "/home/pi/dad_watch.log"
with open(filename, 'r') as f:
attachment = MIMEText(f.read())
attachment.add_header('Content-Disposition', 'attachment', filename=filename)
msg.attach(attachment)
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP('smtp.gmail.com:587')
server.starttls()
server.login(fromaddr, "your password")
text = msg.as_string()
server.sendmail(fromaddr, toaddr, text)
server.quit()
def main():
send_mail()
if __name__ == '__main__':
main()
********************************************************************************
Below is the script that controls the fan to cool the SOC of the RPi.
It uses the actual temperature coming from tne SOC and uses PWM to control the
speed of the fan.
#!/usr/bin/python3
#-------------------------------------------------------------------------------
# Name: run_fan.py
# Purpose: Use PWM to run a fan to keep the core temperature in check
# This program is managed by a systemd script
#
# Author: Paul Versteeg
#
# Created: 01-12-2013, modified june 2019
# Copyright: (c) Paul 2013, 2019
# Licence: <your licence>
#-------------------------------------------------------------------------------
import RPi.GPIO as GPIO
from time import sleep, strftime
import subprocess
import shlex
import string
import sys, os
import traceback
DEBUG = True
LOG_FILENAME = "/home/pi/dad_watch.log"
FAN_PIN = 17 # GPIO 17
GPIO.setwarnings(False) # when everything is working you could turn warnings off
GPIO.setmode(GPIO.BCM) # choose BCM numbering scheme.
# set GPIO port as output driver for the Fan
GPIO.setup(FAN_PIN, GPIO.OUT)
Fan = GPIO.PWM(FAN_PIN, 100) # create object Fan for PWM on port 22 at 100 Hertz
Fan.start(0) # start Fan with 0 percent duty cycle (off)
delay = 30 # seconds of delay, testing the core every 30 seconds is OK
cool_baseline = 60 # start cooling from this temp in Celcius onwards
pwm_baseline = 40 # lowest PWM to keep the fan running
factor = 3 # multiplication factor
max_pwm = 100 # maximum PWM value
fan_running = False # helps to kick-start the fan
log_fan = False # used to limit the fan status conditions
def main():
global fan_running, log_fan
'''
This program controls a Fan by using PWM.
The Fan will probably not work below 40% duty-cycle, so that is the
fan PWM baseline. The maximum PWM cannot be more than 100%.
When the core temperature is above 60 'C, we will start to cool.
When the core reaches 70 degrees, we would like to run the fan at max speed.
To make the PWM related to the temperature, strip the actual temp from the
cool baseline, multiply the delta with 3 and add that to the the baseline
PWM to get 100% at 70 degrees.
I have selected a PWM frequency of 100Hz to avoid high frequency noise, but
you can change that.
'''
try:
while True:
# get the cpu temperature
# need to use the full path otherwise root cannot find it
cmd = "/opt/vc/bin/vcgencmd measure_temp"
args = shlex.split(cmd)
output, error = subprocess.Popen(args, stdout = subprocess.PIPE, \
stderr= subprocess.PIPE).communicate()
# strip the temperature out of the returned string
# the returned string is in the form : b"temp=43.9'C\n"
# if your localization is set to US, you get the temp in Fahrenheit,
# so you need to adapt the stripping somewhat
cpu_temp =float(output[5:9]) # for Celcius
if cpu_temp < cool_baseline :
Fan.ChangeDutyCycle(0) # turn Fan off
fan_running = False
if log_fan :
tstamp = strftime("%H:%M:%S")
subprocess.call(["echo ' {} *** Stopping fan' >> {}"\
.format(tstamp, LOG_FILENAME)], shell=True, \
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
log_fan = False
if cpu_temp > cool_baseline :
if fan_running :
duty_cycle = ((cpu_temp-cool_baseline)*factor)+pwm_baseline
if duty_cycle > max_pwm : duty_cycle = max_pwm # max = 100%
else:
if not log_fan :
tstamp = strftime("%H:%M:%S")
subprocess.call(["echo ' {} *** Starting fan' >> {}"\
.format(tstamp, LOG_FILENAME)], shell=True, \
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
log_fan = True
# kick-start the fan for one cycle
duty_cycle = 70
fan_running = True
Fan.ChangeDutyCycle(duty_cycle) # output the pwm
if DEBUG : print("core temp= {} pwm = {:.2f}".format(cpu_temp, duty_cycle))
sleep(delay)
# the following will allow you to kill the program, you can delete these lines if you want
except KeyboardInterrupt:
Fan.stop() # stop the PWM output
GPIO.cleanup() # clean up GPIO on CTRL+C exit()
except Exception as e:
sys.stderr.write("Got exception: %s" % e)
if DEBUG :
print(traceback.format_exc())
print("GPIO.cleanup")
GPIO.output(FAN_PIN, GPIO.LOW)
GPIO.cleanup(FAN_PIN) # leave the other GPIO-pins alone!
os._exit(1)
if __name__ == '__main__':
main()
********************************************************************************
Here is the current version of the main Python script.
Note that you must install the imutil and the twilio modules both as user pi (or whatever you use), and also as root. Otherwise Python cannot find the modules.
I use Twilio to send an SMS to my mobile. You can sign-up for a free account here : https://www.twilio.com/
#!/usr/bin/python3
#-------------------------------------------------------------------------------
# Name: dad_watch.py
# Purpose: Watching the activity of my dad, and send an SMS as warning when
# no activity has been detected for a while.
# This program is managed by a systemd script
#
# Author: Paul Versteeg, based on idea and blog from Adrian Rosebrock
# https://www.pyimagesearch.com/2018/08/13/opencv-people-counter/
# Modified:
#
# Created: 26-05-2019
# Copyright: (c) Paul Versteeg
# Licence: <your licence>
#-------------------------------------------------------------------------------
# import the necessary packages
from edgetpu.detection.engine import DetectionEngine
from imutils.video import VideoStream
from imutils.video import FPS
from PIL import Image
import argparse
import imutils #pip3 install imutils @ sudo for running as root
import warnings
import datetime
# pip3 install json-minify and also sudo
from json_minify import json_minify
import json
import time
import cv2
import numpy as np
import os
import os.path
import sys
import shlex
import subprocess
import traceback
from threading import Thread
import logging
import logging.handlers
# https://www.twilio.com/
from twilio.rest import Client # pip3 install twilio & sudo for running as root
import RPi.GPIO as GPIO
from multiprocessing import Process, Queue
import signal
VERSION ="2.2" # added json_minify to allow comments in the configuration file
# added Twilio vars to the conf file, load all conf varts at start
# change all references to static vars in the code.
#VERSION ="2.1" # removed gpio.cleanup's to avoid exception in sound_alarm at shutdown
# added ALARM check in timing deadline if statement
#filter warnings, load the configuration file
warnings.filterwarnings("ignore")
conf = json.loads(json_minify(open("/home/pi/conf.json").read()))
# load the configuration variables
DEBUG = conf["DEBUG"]
DAEMON = conf["DAEMON"]
SMS = conf["SMS"]
TEST = conf["TEST"]
alarm_time = conf["alarm_time"]
window = conf["window"]
confidence = conf["confidence"]
twilio_account_sid = conf["twilio_account_sid"] # for Twilio
twilio_auth_token = conf["twilio_auth_token"] # for Twilio
from_cell= conf["from_cell"] # Twilio cell number
to_cell = conf["to_cell"] # target cell number
whatsapp_from = conf["whatsapp_from"] # Twilio whatsapp cell number
if DEBUG :
print ("DEBUG is {}".format(DEBUG))
print ("DAEMON is {}".format(DAEMON))
print("SMS is {}".format(SMS))
print("TEST is {}".format(TEST))
print("confidence {}".format(confidence))
print("alarm_time is {}".format(alarm_time))
print("window is {}".format(window))
print("twilio_account_sid is {}".format(twilio_account_sid))
print("twilio_auth_token is {}".format(twilio_auth_token))
print("from_cell is {}".format(from_cell))
print("to_cell is {}".format(to_cell))
print("whatsapp_from is {}".format(whatsapp_from))
BEEPER = 4 # GPIO 4
ALARM = False # alarm flag
ALARM_RUNNING = False
#--logger definitions
# save daily logs for 7 days
# the logfile will be mailed daily
LOG_FILENAME = "/home/pi/dad_watch.log"
LOG_LEVEL = logging.DEBUG if DEBUG else logging.INFO
logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)
handler = logging.handlers.TimedRotatingFileHandler(LOG_FILENAME, when="midnight", backupCount=7)
formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
class MyLogger():
'''
A class that can be used to capture stdout and sterr to put it in the log
'''
def __init__(self, level, logger):
'''Needs a logger and a logger level.'''
self.logger = logger
self.level = level
def write(self, message):
# Only log if there is a message (not just a new line)
if message.rstrip() != "":
self.logger.log(self.level, message.rstrip())
def flush(self):
pass # do nothing -- just to handle the attribute for now
# --Replace stdout and stderr with logging to file so we can run it as a daemon
# and still see what is going on
if DAEMON :
sys.stdout = MyLogger(logging.INFO, logger)
sys.stderr = MyLogger(logging.ERROR, logger)
def get_cpu_temp():
# get the cpu temperature
# need to use the full path, otherwise root cannot find it
cmd = "/opt/vc/bin/vcgencmd measure_temp"
args = shlex.split(cmd)
output, error = subprocess.Popen(args, stdout = subprocess.PIPE, \
stderr= subprocess.PIPE).communicate()
# strip the temperature out of the returned string
# the returned string is in the form : b"temp=43.9'C\n"
# if your localization is set to US, you get the temp in Fahrenheit,
# so you need to adapt the stripping somewhat
#
cpu_temp =float(output[5:9]) # for Celcius
return(cpu_temp)
def init():
global labels, model, vs, fps
# set the GPIO mode
GPIO.setmode(GPIO.BCM)
if DEBUG:
GPIO.setwarnings(True)
else:
GPIO.setwarnings(False)
# set GPIO port as output driver for the beeper
GPIO.setup(BEEPER, GPIO.OUT)
# the core temp must be at the lowest at startup, so report it.
logger.info("Core temp is {} degrees C".format(get_cpu_temp()))
# initialize the labels dictionary
print("parsing class labels...")
labels = {}
# loop over the class labels file
for row in open("/home/pi/edgetpu_models/coco_labels.txt"):
# unpack the row and update the labels dictionary
(classID, label) = row.strip().split(maxsplit=1)
labels[int(classID)] = label.strip()
# load the Google Coral tpu object detection model
print("loading Coral model...")
model = DetectionEngine("/home/pi/edgetpu_models/mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite")
# initialize the video stream and allow the camera sensor to warmup
print("starting video stream...")
vs = VideoStream(src=0).start() # webcam
# vs = VideoStream(usePiCamera=True).start() # Picam
# let the camera sensor warm-up
time.sleep(2.0)
if not DAEMON :
# start the frames per second throughput estimator
fps = FPS().start()
beep()
def send_sms(msg='no txt'):
account_sid = twilio_account_sid
auth_token = twilio_auth_token
try:
client = Client(account_sid, auth_token)
message = client.messages.create(
body = msg,
from_= from_cell,
to = to_cell)
logger.debug("sid = {}".format(message.sid))
except Exception as e:
logger.error("Unexpected Exception in send_sms() : \n{}".format(e))
return
def beep():
GPIO.output(BEEPER, GPIO.HIGH)
time.sleep(0.05)
GPIO.output(BEEPER, GPIO.LOW)
return
def sound_alarm():
'''
Function to create a seperate thread to start an alarm beep.
The alarm will be stoppen when movement has been detected again or
when the total alarm time has been exceeded.
'''
global alarm_thread, ALARM_RUNNING, ALARM, SMS
try:
class alarm_threadclass(Thread):
def run(self):
global alarm_thread, ALARM_RUNNING, ALARM, SMS
try:
start_time = datetime.datetime.now()
i = 1
while ALARM :
ALARM_RUNNING = True
GPIO.output(BEEPER, GPIO.HIGH)
time.sleep(0.1*i)
GPIO.output(BEEPER, GPIO.LOW)
time.sleep(5)
i += 1
max_alarm = (datetime.datetime.now() - start_time).seconds
# add ALARM in the test to catch a change during the sleep period
if (max_alarm > alarm_time and ALARM) :
logger.info("*** no movement during alarm, sending SMS")
if SMS :
send_sms("no movement during alarm fase")
SMS = False # send only one SMS per session
break
ALARM_RUNNING = False
logger.info("alarm thread ended")
ALARM = False
return
except Exception as e:
logger.error("Unexpected Exception in sound_alarm :\n{0} ".format(e))
alarm_thread = alarm_threadclass()
alarm_thread.setDaemon(True) # so a ctrl-C can terminate it
if not ALARM_RUNNING :
alarm_thread.start() # start the thread
logger.info("sound_alarm thread started")
except Exception as e:
logger.error("Unexpected Exception in sound_alarm() : \n{0}".format(e))
return
def sig_handler (signum=None, frame = None):
'''
This function will catch the most important system signals, but NOT not a shutdown!
This handler catches the following signals from the OS:
SIGHUB = (1) SSH Terminal logout
SIGINT = (2) Ctrl-C
SIGQUIT = (3) ctrl-\
IOerror = (5) when terminating the SSH connection (input/output error)
SIGTERM = (15) Deamon terminate (deamon --stop): is coming from systemd
However, it cannot catch SIGKILL = (9), the kill -9 or the shutdown procedure
'''
try:
logger.info("Sig_handler called with signal : {}".format(signum))
if signum == 1 :
return # ignore SSH logout termination
# the core temp must be at the highest now, so report it.
logger.info("Core temp is {} degrees C".format(get_cpu_temp()))
logger.info("Terminating \n")
beep()
time.sleep(0.5)
beep()
if not DAEMON:
# stop the fps timer and display the collected results
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# do a bit of cleanup
GPIO.output(BEEPER, GPIO.LOW) # force the beeper to quit
if not DAEMON :
cv2.destroyAllWindows()
vs.stop()
time.sleep(1)
os._exit(0) # force the exit to the OS
except Exception as e: # IOerror 005 when terminating the SSH connection
logger.error("Unexpected Exception in sig_handler() : \n{0}".format(e))
return
def main():
global ALARM
logger.info('\n\n dad_watch version %s' % VERSION)
# initialize the frame dimensions (we'll set them as soon as we read
# the first frame from the video)
W = None
H = None
startX = 0
startY = 0
endX = 0
endY = 0
dir = None # direction
side = None
old_side = None
movement = 0
init()
boot_up= True # to avoide a false positive at startup
# setup a catch for the following signals: signal.SIGINT = ctrl-c
for sig in (signal.SIGTERM, signal.SIGINT, signal.SIGHUP, signal.SIGQUIT):
signal.signal(sig, sig_handler)
# setup the movement timers
lastSeen = datetime.datetime.now()
lastSeen_ts = datetime.datetime.now()
try:
# loop over the frames from the video stream
while True:
# setup the presence timers
timestamp = datetime.datetime.now()
lastSeen_ts = datetime.datetime.now()
interval = (lastSeen_ts - lastSeen).seconds
# grab the frame from the threaded video stream and resize it
# to have a maximum width of 500 or 800 pixels during testing
frame = vs.read()
if DAEMON :
frame = imutils.resize(frame, width=500)
else:
frame = imutils.resize(frame, width=800)
orig = frame.copy()
# if the frame dimensions are empty, set them
if W is None or H is None:
(H, W) = frame.shape[:2]
logger.debug("[INFO] H= {} W= {}".format(H,W))
# Create a dividing line in the center of the frame
# It is used to determine movement of the objects
centerline = W // 2
# make sure the alarm does not run indefinetely
if (interval > (window + 100) and ALARM == True) :
logger.info("reset the timer interval")
lastSeen = datetime.datetime.now()
ALARM = False
# check if we went past the movement timing deadline...
if ((interval > window) and (ALARM == False)):
ts = timestamp.strftime("%A %d %B %Y %H:%M:%S")
logger.info("*** no movement alarm")
ALARM = True
sound_alarm()
# prepare the frame for object detection by converting it
# from BGR to RGB channel ordering and then from a NumPy
# array to PIL image format
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame = Image.fromarray(frame)
# do the heavy lifting on the Coral USB tpu
results = model.DetectWithImage(frame, threshold=confidence,
keep_aspect_ratio=True, relative_coord=False)
# loop over the results
for r in results:
# use the index to get at the properlabel
label = labels[r.label_id]
# we're only interested in persons
if label != "person" :
continue
# extract the bounding box and predicted class label
box = r.bounding_box.flatten().astype("int")
(startX, startY, endX, endY) = box
# calculate the middle of the object
position = (startX + endX) // 2
# The centerline line is in the middle of the frame (W // 2)
# Determine if the object movement is accross the centerline
# If the middle of the object AND the right-hand side (endX) has
# crossed, there was a complete move to the left.
# If the middle of the object AND the left-hand side (startX) has
# crossed, there was a complete move to the right.
if position < centerline and endX < centerline :
side = "left"
# print("moving left")
if position > centerline and startX > centerline :
side = "right"
# print("moving right")
# if there is a change in the side, record it as a movement
# avoid a false positive at startup
if boot_up :
old_side = side
boot_up = False
if side is not old_side:
movement += 1
if TEST : beep()
logger.info("movement {}".format(movement))
# reset the counter & alarm flag
lastSeen = datetime.datetime.now()
if (ALARM == True) :
ALARM = False
old_side = side
if not DAEMON :
# draw the bounding box and label on the image
cv2.rectangle(orig, (startX, startY), (endX, endY),
(0, 255, 0), 2)
y = startY - 15 if startY - 15 > 15 else startY + 15
text = "{}: {:.2f}%".format(label, r.score * 100)
cv2.putText(orig, text, (startX, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
if not DAEMON :
# show the output frame and wait for a key press
cv2.imshow("Frame", orig)
key = cv2.waitKey(1) & 0xFF
# if the `q` key was pressed, break from the loop
if key == ord("q"):
break
# update the FPS counter
fps.update()
except KeyboardInterrupt:
if (DEBUG) and (not DAEMON): print ("\nCtrl-C Terminating")
except Exception as e:
sys.stderr.write("Got exception: %s" % e)
if (DEBUG) and (not DAEMON): print(traceback.format_exc())
logger.error("exception in main() \n {}".format(traceback.format_exc()))
finally:
if not DAEMON :
# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# do a bit of cleanup
print("GPIO.cleanup")
GPIO.output(BEEPER, GPIO.LOW) # force beeper to quit
GPIO.cleanup() # cleanup does not really force the output to zero
cv2.destroyAllWindows()
vs.stop()
if __name__ == '__main__':
main()
********************************************************************************
Here is the systemd profile. There is one special command that checks the presence of a file to decide to start the process. I use this to allow the user to prevent the system from running and possibly send out bogus SMS messages, if the user is not in the room for a longer period. If the file is present, the program will not start.
The service file is located here : /etc/systemd/system/dad_watch.service
# This service installs a python script that runs the dad_watch python script.
# The execution of the target program is depending on the absence of a file.
# When the script crashes, it is automatically restarted.
# If it crashes too many times, it will be forced to fail, or you can let systemd reboot
#
# use systemctl daemon-reload if you made changes to this file.
[Unit]
Description=Installing dad_watch script
Requires=basic.target
After=multi-user.target
# a file existence is checked before the unit is started
ConditionPathExists=/home/pi/do_not_run
[Service]
ExecStart=/usr/bin/python3 /home/pi/dad_watch.py
Restart=always
# The number of times the service is restarted within a time period can be set
# If that condition is met, the RPi can be rebooted
#
StartLimitBurst=4
StartLimitInterval=180s
# actions can be none|reboot|reboot-force|reboot-immidiate
StartLimitAction=none
# The following are defined in the /etc/systemd/system.conf file and are
# global for all services
#
#DefaultTimeoutStartSec=90s
#DefaultTimeoutStopSec=90s
#
# They can also be set on a per process here:
# if they are not defined here, they fall back to the system.conf values
#TimeoutStartSec=2s
#TimeoutStopSec=2s
[Install]
WantedBy=multi-user.target
********************************************************************************