-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathapp.js
145 lines (120 loc) · 4.04 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Entry Point for the LRS ReRoute Service.
//
// This is a NodeJS Express application
//
const http = require("http")
const WebSocket = require('ws');
const express = require("express");
const kafkaConsumer = require("simple-kafka-consumer");
const config = require("./config");
const APP_PORT = (process.env.APP_PORT || 3000);
const WS_PASSWORD = (process.env.WS_PASSWORD || config.websocket.password);
const app = express();
const server = http.createServer(app);
/**
* The point of this page is to montior the status of our Kafka cluster.
*
* To that end, our monitoring page will open a socket with this service
* to relay messages to our page.
*/
const openSockets = []
const wss = new WebSocket.Server({
path: config.root + "/kafka",
server,
});
wss.on('connection', function (ws) {
// Assign to our sockets
openSockets.push(ws);
ws.authenticated = false;
ws.noMessages = true;
ws.on("message", function (data) {
if (ws.noMessages && data == WS_PASSWORD) {
ws.authenticated = true;
console.log("AUTHENTICATED SOCKET")
} else if (data != "keep-alive") {
ws.close();
console.log("CLOSING: ", data)
}
ws.noMessages = false;
});
// If the socket is closed, stop sending messages to it
ws.on("close", function close() {
let index = openSockets.indexOf(ws);
openSockets.splice(index, 1);
});
// Send an message when they connect here
ws.send('Connected to Kafka Web Socket Monitor!');
});
setInterval(() => {
for(let socket of openSockets) {
if (socket.authenticated)
socket.send("keep-alive");
}
}, 5000)
// Broadcast a message to each open socket
//
function broadcast(message) {
for (let k = 0; k < openSockets.length; k++) {
if (openSockets[k].authenticated) {
openSockets[k].send(message);
}
}
}
var recentCount = 0;
var throttleCount = 100;
var throttleTimerMS = 500;
var throttleWarned = false;
// Configure this with our environment and config values
kafkaConsumer.configure({
brokers: (process.env.KAFKA_BROKER || config.kafka.brokers.join(",")),
saslUser: (process.env.KAFKA_SASL_USER || config.kafka.sasl ? config.kafka.sasl.username : undefined),
saslPass: (process.env.KAFKA_SASL_PASS || config.kafka.sasl ? config.kafka.sasl.password : undefined),
topics: config.kafka.topics.map(topic => topic.name),
consumerGroup: config.kafka.consumerGroup
})
// Set up our consumer to broadcast its traffic to our web sockets
kafkaConsumer.initConsumer((topic, offset, message) => {
console.log(topic, offset, message.length > 100 ? message.substr(0, 100) + " ..." : message)
recentCount++;
if (recentCount >= throttleCount) {
if (throttleWarned == false) {
broadcast(`[${message.topic}, # ${offset}, (throttled ${recentCount})]\n"High message rates will be throttled for performance with this page."`)
}
throttleWarned = true;
return;
}
broadcast(`[${topic}, # ${offset}]\n${message}\n`)
})
// Limit how many we can receive on a duration
setInterval(function () {
recentCount = 0;
throttleWarned = false;
}, throttleTimerMS);
/**
* Lastly, configure that express instance to serve this page.
*/
app.set("view engine", "ejs");
app.use(express.static("public"));
app.use(express.static("scripts"));
app.use(express.static("views"));
app.use(config.root, express.static("public"));
app.use(config.root, express.static("scripts"));
app.use(config.root, express.static("views"));
app.use("*", function (req, res, next) {
if (req.baseUrl.startsWith(config.root) == false)
res.redirect(config.root.substr(1) + req.url);
else
next();
});
// Main page.
app.get(config.root, function (req, res, next) {
res.render("index.ejs", {
password: WS_PASSWORD,
root: config.root,
topics: config.kafka.topics
});
});
// Then start the server.
server.listen(APP_PORT, function () {
console.log("\nKafka Web Socket Example listening on port %s", APP_PORT);
});