This repository has been archived by the owner on Apr 5, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
220 lines (190 loc) · 7.11 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import RPi.GPIO as GPIO
import time
import datetime
import os
#from balena import Balena
import sys
import paho.mqtt.client as mqtt
import json
import threading
import signal
import socket
pulse_per_second= 0
pulse_count = 0
pulse_output = {
"uuid": os.environ.get('BALENA_DEVICE_UUID'),
"gpio": 0,
"pulse_per_second": 0,
"pulse_per_minute": 0,
"pulse_per_hour": 0,
"pulse_count": 0,
"pps_mult": 0,
"ppm_mult": 0,
"pph_mult": 0
}
env_vars = {
"pulse_multiplier": 1,
"gpio_pin": 37,
"bounce_time": 200,
"mqtt_address": "none",
"gpio_reset_pin": 38,
"enable_webserver": 0,
"pull_up_down": "down"
}
sum_queue = []
pulse_multiplier = 0
client = mqtt.Client()
# Use the sdk to get services (eventually)
def mqtt_detect():
return False
# Simple webserver
def background_web(server_socket):
global pulse_output
while True:
# Wait for client connections
client_connection, client_address = server_socket.accept()
# Get the client request
request = client_connection.recv(1024).decode()
print(request)
# Send HTTP response
response = 'HTTP/1.0 200 OK\n\n'+ json.dumps(pulse_output)
print(pulse_output)
client_connection.sendall(response.encode())
client_connection.close()
class ProgramKilled(Exception):
"""
An instance of this custom exception class will be thrown every time we get an SIGTERM or SIGINT
"""
pass
# Raise the custom exception whenever SIGINT or SIGTERM is triggered
def signal_handler(signum, frame):
raise ProgramKilled
# This method fires on edge detection from a reset button
def on_reset(channel):
global pulse_count
print("pulse reset detected")
pulse_count = 0
# This function serves as the callback triggered on every run of our IntervalThread
def action() :
global pulse_per_second, sum_queue, client, env_vars, pulse_output
pulse_per_minute = 0
pulse_per_hour = 0
pulse_multiplier = env_vars["pulse_multiplier"]
sum_queue.append(pulse_per_second)
if len(sum_queue) > 1:
pulse_per_minute = sum(sum_queue[-60:])
pulse_per_hour = sum(sum_queue[-3600:])
if len(sum_queue) > 3600:
sum_queue.pop(0)
pulse_output["gpio"] = env_vars["gpio_pin"]
pulse_output["pulse_per_second"] = pulse_per_second
pulse_output["pulse_per_minute"] = pulse_per_minute
pulse_output["pulse_per_hour"] = pulse_per_hour
pulse_output["pulse_count"] = pulse_count
pulse_output["pps_mult"] = pulse_per_second * pulse_multiplier
pulse_output["ppm_mult"] = pulse_per_minute * pulse_multiplier
pulse_output["pph_mult"] = pulse_per_hour * pulse_multiplier
#print(pulse_output)
if env_vars["mqtt_address"] != "none":
client.publish('pulse_data', json.dumps(pulse_output))
pulse_per_second = 0
# See https://stackoverflow.com/questions/2697039/python-equivalent-of-setinterval
class IntervalThread(threading.Thread) :
def __init__(self,interval,action, *args, **kwargs) :
super(IntervalThread, self).__init__()
self.interval=interval
self.action=action
self.stopEvent=threading.Event()
self.start()
def run(self) :
nextTime=time.time()+self.interval
while not self.stopEvent.wait(nextTime-time.time()) :
nextTime+=self.interval
self.action()
def cancel(self) :
self.stopEvent.set()
def main():
global pulse_per_second, pulse_count, env_vars, client
# device variables
env_vars["pulse_multiplier"] = float(os.getenv('PULSE_MULTIPLIER', '1'))
env_vars["gpio_pin"] = os.getenv('GPIO_PIN', 37)
env_vars["bounce_time"] = os.getenv('BOUNCE_TIME', 0)
env_vars["mqtt_address"] = os.getenv('MQTT_ADDRESS', 'none')
env_vars["gpio_reset_pin"] = os.getenv('GPIO_RESET_PIN', 38)
env_vars["enable_webserver"] = os.getenv('ALWAYS_USE_HTTPSERVER', 0)
env_vars["pull_up_down"] = os.getenv('PULL_UP_DOWN', 'NONE')
if env_vars["enable_webserver"] == "1":
env_vars["enable_webserver"] = "True"
else:
env_vars["enable_webserver"] = "False"
GPIO.setmode(GPIO.BOARD)
gpio_pin = int(env_vars["gpio_pin"])
# Set the pin for the incoming pulse
if env_vars["pull_up_down"] == "UP":
GPIO.setup(gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
test_cond = GPIO.LOW
elif env_vars["pull_up_down"] == "DOWN":
GPIO.setup(gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
test_cond = GPIO.HIGH
else:
GPIO.setup(gpio_pin, GPIO.IN)
test_cond = GPIO.HIGH
if env_vars["bounce_time"] == 0:
bounce_time = 0
else:
if str(env_vars["bounce_time"]).isnumeric():
bounce_time = int(env_vars["bounce_time"])/1000
print("Bounce time set to {0} second(s)".format(bounce_time))
else:
bounce_time = 0
# Set the pin for pulse count reset
GPIO.setup(int(env_vars["gpio_reset_pin"]), GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.add_event_detect(int(env_vars["gpio_reset_pin"]), GPIO.FALLING, callback=on_reset, bouncetime=200)
if mqtt_detect() and env_vars["mqtt_address"] == "none":
env_vars["mqtt_address"] = "mqtt"
if env_vars["mqtt_address"] != "none":
print("Starting mqtt client, publishing to {0}:1883".format(env_vars["mqtt_address"]))
try:
client.connect(env_vars["mqtt_address"], 1883, 60)
except Exception as e:
print("Error connecting to mqtt. ({0})".format(str(e)))
env_vars["mqtt_address"] = "none"
env_vars["enable_webserver"] = "True"
else:
client.loop_start()
else:
env_vars["enable_webserver"] = "True"
if env_vars["enable_webserver"] == "True":
SERVER_HOST = '0.0.0.0'
SERVER_PORT = 7575
# Create socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((SERVER_HOST, SERVER_PORT))
server_socket.listen(1)
print("HTTP server listening on port {0}...".format(SERVER_PORT))
t = threading.Thread(target=background_web, args=(server_socket,))
t.start()
# Handle SIGINT and SIFTERM with the help of the callback function
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# start action every 1s
inter=IntervalThread(1,action)
# See https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/
while True:
try:
GPIO.wait_for_edge(gpio_pin, GPIO.FALLING)
if bounce_time > 0:
time.sleep(bounce_time)
if GPIO.input(gpio_pin) == test_cond:
pulse_per_second = pulse_per_second + 1
pulse_count = pulse_count + 1
else:
pulse_per_second = pulse_per_second + 1
pulse_count = pulse_count + 1
except ProgramKilled:
print("Program killed: running cleanup code")
inter.cancel()
break
if __name__ == "__main__":
main()