73 lines
2.2 KiB
JavaScript
73 lines
2.2 KiB
JavaScript
const { MongoClient, MongoClientOptions } = require("mongodb");
|
|
const express = require('express');
|
|
|
|
/*============== VARIABLE DECLARATION ==============*/
|
|
// Mongo set-up variables
|
|
const mongoCollection = process.env.MONGO_COLLECTION || 'log';
|
|
const mongoUri = process.env.MONGODB_HOST || 'mongodb://127.0.0.1:27017/default?replicaSet=rs0';
|
|
|
|
const historyNumber = parseInt(process.env.HISTORY_AMOUNT) || 50;
|
|
|
|
// Stream set-up variables
|
|
let changeStream;
|
|
const options = { fullDocument: "updateLookup" };
|
|
const pipeline = [];
|
|
const PORT = process.env.PORT || 3002;
|
|
|
|
// Create Mongo client
|
|
const mongoOptions = {
|
|
readPreference: 'secondaryPreferred'
|
|
}
|
|
const mongoClient = new MongoClient(mongoUri, mongoOptions);
|
|
|
|
/*============== CODE ==============*/
|
|
async function run() {
|
|
|
|
console.log('server.js has been launched');
|
|
const app = express();
|
|
|
|
await mongoClient.connect();
|
|
const collection = mongoClient.db().collection(mongoCollection);
|
|
|
|
changeStream = collection.watch(pipeline, options);
|
|
console.log("Started watching changes in database");
|
|
|
|
const writeMessage = (response, blob) => {
|
|
const id = blob._id || null
|
|
const message = `id: ${id}\nevent: message\ndata: ${JSON.stringify(blob)}\n\n`
|
|
response.write(message)
|
|
}
|
|
|
|
// Triggers on GET at /event route
|
|
app.get('/events', async function (request, response) {
|
|
|
|
// Notify SSE to React
|
|
const header = { 'Content-Type': 'text/event-stream', 'Connection': 'keep-alive' };
|
|
response.writeHead(200, "OK", header);
|
|
|
|
const historyCursor = collection.find()
|
|
.sort({$natural:-1})
|
|
.limit(historyNumber).toArray().then((res) => {
|
|
res.reverse().forEach((document) => {
|
|
writeMessage(response, document)
|
|
})
|
|
});
|
|
|
|
const changeListener = async (change) => {
|
|
// Ignore events without fullDocument, e.g. deletes.
|
|
if (change.fullDocument) {
|
|
writeMessage(response, change.fullDocument)
|
|
}
|
|
}
|
|
changeStream.on("change", changeListener);
|
|
response.on('close', () => {
|
|
changeStream.removeListener("change", changeListener)
|
|
})
|
|
});
|
|
|
|
app.listen(PORT);
|
|
console.log(`Server listening at 127.0.0.1:${PORT}`);
|
|
}
|
|
|
|
run().catch(console.dir);
|