MongoDB
 sql >> Datenbank >  >> NoSQL >> MongoDB

Die CosmosDb-Anforderungsrate ist bei insertMany hoch

Der Mongo-Treiber teilt Ihnen mit, bei welchen Datensätzen Fehler aufgetreten sind und welche überhaupt nicht verarbeitet wurden. Wenn alle Fehler (normalerweise einer) den Code 16500 haben, dann ist Ihr Problem die Drosselung und Sie versuchen es erneut bei Fehlern und die verbleibenden Datensätze sind sicher. Andernfalls werden Ihre Fehler durch etwas anderes verursacht und Sie sollten eine Analyse durchführen und entscheiden, ob Sie mit Wiederholungen fortfahren.

Der Mongo-Treiber gibt keinen HTTP-Header zurück, bei dem Cosmos DB eine Verzögerung vor dem erneuten Versuch vorschlägt, aber das ist keine große Sache. Eine Verzögerung garantiert sowieso keinen Erfolg, da andere Anforderungen, die dieselbe Datenbank treffen, möglicherweise RUs verbrauchen. Sie sind besser dran, zu experimentieren und Ihre eigenen Wiederholungsregeln festzulegen. Unten ist eine einfache rekursive Lösung, die es immer wieder versucht, bis alles in Ordnung ist oder das Wiederholungslimit erreicht ist.

    private async Task InsertManyWithRetry(IMongoCollection<BsonDocument> collection, 
        IEnumerable<BsonDocument> batch, int retries = 10, int delay = 300)
    {
        var batchArray = batch.ToArray();

        try
        {
            await collection.InsertManyAsync(batchArray);
        }
        catch (MongoBulkWriteException<BsonDocument> e)
        {
            if (retries <= 0)
                throw;

            //Check if there were any errors other than throttling.
            var realErrors = e.WriteErrors.Where(we => we.Code != 16500).ToArray();
            //Re-throw original exception for now.
            //TODO: We can make it more sophisticated by continuing with unprocessed records and collecting all errors from all retries.
            if (realErrors.Any())
                throw;

            //Take all records that had errors.
            var errors = e.WriteErrors.Select(we => batchArray[we.Index]);
            //Take all unprocessed records.
            var unprocessed = e.UnprocessedRequests
                .Where(ur => ur.ModelType == WriteModelType.InsertOne)
                .OfType<InsertOneModel<BsonDocument>>() 
                .Select(ur => ur.Document);

            var retryBatchArray = errors.Union(unprocessed).ToArray();

            _logger($"Retry {retryBatchArray.Length} records after {delay} ms");

            await Task.Delay(delay);

            await InsertManyWithRetry(collection, retryBatchArray, retries - 1, delay);
        }
    }