log-viewer/backend/server.js

73 lines
2.2 KiB
JavaScript
Raw Normal View History

2022-10-10 23:24:16 +00:00
const { MongoClient, MongoClientOptions } = require("mongodb");
2022-02-09 11:56:34 +00:00
const express = require('express');
2022-02-15 08:55:59 +00:00
/*============== VARIABLE DECLARATION ==============*/
2022-02-09 11:56:34 +00:00
// Mongo set-up variables
const mongoCollection = process.env.MONGO_COLLECTION || 'log';
const mongoUri = process.env.MONGODB_HOST || 'mongodb://127.0.0.1:27017/default?replicaSet=rs0';
const historyNumber = parseInt(process.env.HISTORY_AMOUNT) || 50;
2022-02-09 11:56:34 +00:00
// Stream set-up variables
let changeStream;
const options = { fullDocument: "updateLookup" };
const pipeline = [];
const PORT = process.env.PORT || 3002;
// Create Mongo client
2022-10-10 23:23:54 +00:00
const mongoOptions = {
readPreference: 'secondaryPreferred'
}
const mongoClient = new MongoClient(mongoUri, mongoOptions);
2022-02-09 11:56:34 +00:00
/*============== CODE ==============*/
async function run() {
console.log('server.js has been launched');
2022-02-09 11:56:34 +00:00
const app = express();
await mongoClient.connect();
const collection = mongoClient.db().collection(mongoCollection);
2022-02-09 11:56:34 +00:00
changeStream = collection.watch(pipeline, options);
console.log("Started watching changes in database");
const writeMessage = (response, blob) => {
2022-10-05 15:18:22 +00:00
const id = blob._id || null
const message = `id: ${id}\nevent: message\ndata: ${JSON.stringify(blob)}\n\n`
response.write(message)
}
2022-02-09 11:56:34 +00:00
2022-02-15 16:34:10 +00:00
// Triggers on GET at /event route
2022-02-09 11:56:34 +00:00
app.get('/events', async function (request, response) {
2022-02-15 19:34:59 +00:00
// Notify SSE to React
const header = { 'Content-Type': 'text/event-stream', 'Connection': 'keep-alive' };
2022-02-09 11:56:34 +00:00
response.writeHead(200, "OK", header);
const historyCursor = collection.find()
.sort({$natural:-1})
.limit(historyNumber).toArray().then((res) => {
res.reverse().forEach((document) => {
writeMessage(response, document)
})
});
2022-02-15 16:34:10 +00:00
const changeListener = async (change) => {
2022-10-10 23:24:41 +00:00
// Ignore events without fullDocument, e.g. deletes.
if (change.fullDocument) {
writeMessage(response, change.fullDocument)
}
}
changeStream.on("change", changeListener);
response.on('close', () => {
changeStream.removeListener("change", changeListener)
2022-02-15 16:34:10 +00:00
})
2022-02-09 11:56:34 +00:00
});
app.listen(PORT);
console.log(`Server listening at 127.0.0.1:${PORT}`);
}
run().catch(console.dir);