db.collection.aggregate(
[
{
"$addFields": {
"indexes": {
"$range": [
0,
{
"$size": "$time_series"
}
]
},
"reversedSeries": {
"$reverseArray": "$time_series"
}
}
},
{
"$project": {
"derivatives": {
"$reverseArray": {
"$slice": [
{
"$map": {
"input": {
"$zip": {
"inputs": [
"$reversedSeries",
"$indexes"
]
}
},
"in": {
"$subtract": [
{
"$arrayElemAt": [
"$$this",
0
]
},
{
"$arrayElemAt": [
"$reversedSeries",
{
"$add": [
{
"$arrayElemAt": [
"$$this",
1
]
},
1
]
}
]
}
]
}
}
},
{
"$subtract": [
{
"$size": "$time_series"
},
1
]
}
]
}
},
"time_series": 1
}
}
]
)
Dazu können wir die obige Pipeline in Version 3.4+ verwenden. In der Pipeline verwenden wir den $addFields
Pipeline-Phase. -Operator, um das Array des Elementindex von "time_series" zum Dokument hinzuzufügen, haben wir auch das Zeitreihen-Array umgekehrt und dem Dokument hinzugefügt, indem wir jeweils den $range
und $reverseArray
Operatoren
Wir haben das Array hier umgekehrt, weil das Element an Position p
im Array ist immer größer als das Element an Position p+1
was bedeutet, dass [p] - [p+1] <0
und wir möchten den $multiply
nicht verwenden
hier. (siehe Pipeline für Version 3.2)
Als nächstes $zipped
die Zeitreihendaten mit dem Indexarray und wendete einen subtract
Ausdruck in das resultierende Array mit $map
Betreiber.
Dann $slice
das Ergebnis, um null/None
zu verwerfen Wert aus dem Array und kehrte das Ergebnis um.
In 3.2 können wir den $unwind
verwenden
Operator zum Entspannen unser Array und schließen Sie den Index jedes Elements in das Array ein, indem Sie ein Dokument als Operanden anstelle des traditionellen "Pfads" mit dem Präfix $ angeben .
Als nächstes in der Pipeline müssen wir $group
unsere Dokumente und verwenden Sie den $push
Akkumulator-Operator, um ein Array von Unterdokumenten zurückzugeben, die wie folgt aussehen:
{
"_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
"time_series" : [
{ "value" : 10, "index" : NumberLong(0) },
{ "value" : 20, "index" : NumberLong(1) },
{ "value" : 40, "index" : NumberLong(2) },
{ "value" : 70, "index" : NumberLong(3) },
{ "value" : 110, "index" : NumberLong(4) }
]
}
Schließlich kommt der $project
Bühne. In dieser Phase müssen wir den $map
-Operator, um eine Reihe von Ausdrücken auf jedes Element im neu berechneten Array in $group
anzuwenden Stufe.
Folgendes passiert in der $map
(siehe $map
als for-Schleife) in Ausdruck:
Für jedes Unterdokument weisen wir den Wert zu -Feld in eine Variable mithilfe von $let
Variablenoperator. Wir subtrahieren dann seinen Wert vom Wert des "Wert"-Felds des nächsten Elements im Array.
Da das nächste Element im Array das Element am aktuellen Index plus eins ist, brauchen wir nur die Hilfe von $arrayElemAt
-Operator und einen einfachen $add
ition des Index des aktuellen Elements und 1
.
Der $subtract
Ausdruck gibt einen negativen Wert zurück, also müssen wir den Wert mit -1
multiplizieren mit $multiply
Betreiber.
Wir müssen auch $filter
das resultierende Array, weil es das letzte Element None
ist oder null
. Der Grund dafür ist, dass, wenn das aktuelle Element das letzte Element ist, $subtract
gibt Keine
zurück weil der Index des nächsten Elements gleich der Größe des Arrays ist.
db.collection.aggregate([
{
"$unwind": {
"path": "$time_series",
"includeArrayIndex": "index"
}
},
{
"$group": {
"_id": "$_id",
"time_series": {
"$push": {
"value": "$time_series",
"index": "$index"
}
}
}
},
{
"$project": {
"time_series": {
"$filter": {
"input": {
"$map": {
"input": "$time_series",
"as": "el",
"in": {
"$multiply": [
{
"$subtract": [
"$$el.value",
{
"$let": {
"vars": {
"nextElement": {
"$arrayElemAt": [
"$time_series",
{
"$add": [
"$$el.index",
1
]
}
]
}
},
"in": "$$nextElement.value"
}
}
]
},
-1
]
}
}
},
"as": "item",
"cond": {
"$gte": [
"$$item",
0
]
}
}
}
}
}
])
Eine andere Option, die meiner Meinung nach weniger effizient ist, besteht darin, eine Map/Reduce-Operation für unsere Sammlung mit map_reduce
Methode.
>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
... function() {
... var derivatives = [];
... for (var index=1; index<this.time_series.length; index++) {
... derivatives.push(this.time_series[index] - this.time_series[index-1]);
... }
... emit(this._id, derivatives);
... }
... """)
>>> reducer = Code("""
... function(key, value) {}
... """)
>>> for res in collection.map_reduce(mapper, reducer, out={'inline': 1})['results']:
... print(res) # or do something with the document.
...
{'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')}
Sie können auch das gesamte Dokument abrufen und die numpy.diff
verwenden um die Ableitung wie folgt zurückzugeben:
import numpy as np
for document in collection.find({}, {'time_series': 1}):
result = np.diff(document['time_series'])