Upserts auf Arrays von Sub-Dokumenten in MongoDB
Stellen wir uns folgendes Szenario vor: auf unserem Dashboard wollen wir die Gesamtanzahl eines Ereignisses für einen Sensor in den letzten 24 Stunden anzeigen. Diese Daten wollen wir nicht mit einem separaten Request holen, sondern direkt mit dem Sensor laden. Da wir davon ausgehen, dass eine größere Menge von Sensoren gibt, wollen wir die Daten vorbereiten, so dass wir nicht auf eine Aggregation mit $lookup
-Stage angewiesen sind. Einerseits ist das $lookup
auf eine andere Collection nicht sehr performant, andererseits gibt es in MicroService-Szenarien unter Umständen auch keinen direkten Zugriff auf die Ereignisse zu einem Sensor, da diese in einer anderen Datenbank(-technologie) abgelegt sind.
Die folgende Dokumentenstruktur für einen Sensor wäre hierfür gut geeignet:
{
"_id": ObjectId("5a934e000102030405000000"),
"title": "Sensor 1",
"totals": [
{
"timestamp": ISODate("2022-09-12T06:00:00Z"),
"count": 1
},
{
"timestamp": ISODate("2022-09-12T07:00:00Z"),
"count": 2
}
]
}
Im obigen Beispiel steht die title
-Property für die allgemeinen Metadaten des Sensors, das totals
-Array enthält die Anzahl der Ereignisse in stündlicher Aggregation.
Beim Empfang neuer Sensordaten wollen wir also die Anzahl der neuen Ereignisse auf die entsprechende Stunde addieren oder einen neuen Eintrag anlegen, wenn es für die entsprechende Stunde noch keinen Eintrag gibt. Gleichzeitig wollen wir veraltete Einträge aus dem Array löschen, um die Dokumentengröße nicht ins Unendliche wachsen zu lassen. Hierbei können wir ein Update mit Aggregation-Pipeline nutzen, durch das wir erweiterte Möglichkeiten beim Update haben.
Die Aggregation-Pipeline wird in unserem Beispiel aus drei Stages bestehen (in unserem Beispiel gibt es neue 123 Einträge für 10:00:00 Uhr am 12.09.2022 UTC; Einträge vor 14:00 Uhr am 11.09.2022 UTC sollen gelöscht werden):
Initialisierung des totals
-Arrays
Dieser Schritt ist nur notwendig, wenn noch nicht alle Dokumente die totals
-Property enthalten. Durch die Initialisierung mit einem leeren Array ist sichergestellt, dass das Update auch bei Dokumenten funktioniert, auf denen die Property bisher nicht vorhanden ist.
{
"$set": {
"totals": {
"$ifNull": [
"$totals",
[]
]
}
}
}
Entfernen der obsoleten Einträge aus dem Array
In unserem Beispiel geht es um einen 24-Stunden-Zeitraum, so dass wir ältere Einträge aus dem Array entfernen können und maximal 24 Einträge im Array haben werden. Dies dient zur Reduzierung der Dokumentengröße.
{
"$set": {
"totals": {
"$filter": {
"input": "$totals",
"cond": {
"$gte": [
"$$this.timestamp",
ISODate("2022-09-11T14:00:00Z")
]
}
}
}
}
}
Upsert auf das Array
Hierbei wird zuerst geprüft, ob für den aktuellen Stundenzeitraum schon ein Eintrag vorhanden ist. Falls ja, wird dieser Eintrag im Array gesucht und um die Anzahl der neuen Ereignisse erhöht. Ist noch kein Eintrag für die Stunde vorhanden, wird ein neues Sub-Dokument eingefügt.
{
$set: {
totals: {
"$cond": {
"if": {
"$in": [
ISODate("2022-09-12T10:00:00Z"),
"$totals.timestamp"
]
},
"then": {
"$map": {
"input": "$totals",
"in": {
"$cond": {
"if": {
"$eq": [
"$$this.timestamp",
ISODate("2022-09-12T10:00:00Z")
]
},
"then": {
"$mergeObjects": [
"$$this",
{
count: {
"$add": [
{
"$ifNull": [
"$$this.count",
0
]
},
123
]
}
}
]
},
"else": "$$this"
}
}
}
},
"else": {
"$concatArrays": [
"$totals",
[
{
"timestamp": ISODate("2022-09-12T10:00:00Z"),
"count": 123
}
]
]
}
}
}
}
}
Diese letzte Stage ist sicherlich die umfangreichste und auf den ersten Blick nicht ganz einfach zu überblicken. Die wesentlichen Elemente sind:
- Die äußere
$cond
-Bedingung prüft, ob schon ein Element für die Stunde im Array vorhanden ist. - Falls ja, wird ein
$map
durchgeführt, durch das die einzelnen Elemente des Array durchlaufen werden.- Eine weitere, die innere
$cond
dient dazu, zu prüfen, ob das aktuelle Array-Element zur gesuchten Zeit passt. - Falls ja, wird das Array-Element durch
$mergeObjects
aus dem bestehenden Element und einem Objekt, bei dem die Anzahl der neuen Ereignisse zu den bestehenden Events addiert wird ($add
) erzeugt. Auch beim Addieren wird sicherheitshalber auf null-Werte geprüft. - Alle anderen Array-Elemente werden ohne Änderung zurückgegeben (
$$this
).
- Eine weitere, die innere
- Falls nein, wird das neue Array-Element an das bestehende Array per
$concatArrays
angefügt.
Das Beispiel steht als Mongo-Playground zur Verfügung und kann dort getestet werden.
Auslesen der Daten
Das Auslesen der Daten inklusive der Ereignisanzahl kann nun mit einer Aggregation-Pipeline erfolgen. Wichtig ist hier, dass obsolete Einträge ebenfalls entfernt werden, um akkurate Zahlen zu erhalten, auch wenn in den letzten 24 Stunden kein Update erfolgt ist:
[
{
"$set": {
"totals": {
"$ifNull": [
"$totals",
[]
]
}
}
},
{
"$set": {
"totals": {
"$filter": {
"input": "$totals",
"cond": {
"$gte": [
"$$this.timestamp",
ISODate("2022-09-11T14:00:00Z")
]
}
}
}
}
},
{
"$set": {
"count": {
"$sum": "$totals.count"
}
}
},
{
"$unset": "totals"
}
]
Sicherheitshalber wird in obiger Pipeline wieder sichergestellt, dass das totals
-Array initialisiert ist. Die Pipeline kann aus Ausgangspunkt genutzt werden, um nach der Anzahl der Ereignisse zu sortieren oder zu filtern.
Auf Basis unseres Beispiels werden folgende Daten ausgegeben:
[
{
"_id": ObjectId("5a934e000102030405000000"),
"count": 138,
"title": "Sensor 1"
},
{
"_id": ObjectId("5a934e000102030405000001"),
"count": 123,
"title": "Sensor 2"
}
]
Auch dieses Beispiel steht als Mongo-Playground zum Ausprobieren zur Verfügung.
Fazit
Diese Vorgehensweise ermöglicht Upserts auf Arrays, so dass die Daten direkt am Sensor vorbereitet werden. Durch das Ablegen aller Daten direkt am Sensor werden auch MicroService-Szenarien gut unterstützt, bei denen die Ereignisdaten an sich u.U. in einem anderen MicroService verarbeitet werden und die Ansicht über die Sensoren trotzdem einen Überblick über die aktuellen Ereignisanzahlen geben soll.
Das obige Beispiel hat lediglich Ereignisanzahlen aggregiert, um als Beispiel übersichtlich zu bleiben. Es ist darüber hinaus natürlich ohne Weiteres möglich, Durchschnitte und Extremwerte für den Stundenzeitraum zu berechnen.