Redis
 sql >> Datenbank >  >> NoSQL >> Redis

Flask by Example – Implementieren einer Redis-Aufgabenwarteschlange

Dieser Teil des Tutorials beschreibt, wie eine Redis-Aufgabenwarteschlange implementiert wird, um die Textverarbeitung zu handhaben.

Aktualisierungen:

  • 12.02.2020:Upgrade auf Python Version 3.8.1 sowie die neuesten Versionen von Redis, Python Redis und RQ. Siehe unten für Details. Erwähnen Sie einen Fehler in der neuesten RQ-Version und bieten Sie eine Lösung an. Fehler „http vor https“ behoben.
  • 22.03.2016:Upgrade auf Python Version 3.5.1 sowie die neuesten Versionen von Redis, Python Redis und RQ. Einzelheiten siehe unten.
  • 22.02.2015:Python 3-Unterstützung hinzugefügt.

Kostenloser Bonus: Klicken Sie hier, um Zugang zu einem kostenlosen Flask + Python-Video-Tutorial zu erhalten, das Ihnen Schritt für Schritt zeigt, wie Sie eine Flask-Webanwendung erstellen.

Denken Sie daran:Wir bauen Folgendes:Eine Flask-App, die Worthäufigkeitspaare basierend auf dem Text einer bestimmten URL berechnet.

  1. Teil 1:Richten Sie eine lokale Entwicklungsumgebung ein und stellen Sie dann sowohl eine Staging- als auch eine Produktionsumgebung auf Heroku bereit.
  2. Teil Zwei:Richten Sie eine PostgreSQL-Datenbank zusammen mit SQLAlchemy und Alembic ein, um Migrationen zu handhaben.
  3. Teil Drei:Fügen Sie die Back-End-Logik hinzu, um die Wortzählungen von einer Webseite mithilfe der Bibliotheken Requests, BeautifulSoup und Natural Language Toolkit (NLTK) zu schaben und dann zu verarbeiten.
  4. Teil Vier:Implementieren Sie eine Redis-Aufgabenwarteschlange für die Textverarbeitung. (aktuell )
  5. Teil Fünf:Richten Sie Angular am Front-End ein, um das Back-End kontinuierlich abzufragen, um zu sehen, ob die Verarbeitung der Anfrage abgeschlossen ist.
  6. Teil Sechs:Auf den Staging-Server auf Heroku pushen – Redis einrichten und detailliert beschreiben, wie zwei Prozesse (Web und Worker) auf einem einzigen Dyno ausgeführt werden.
  7. Teil 7:Aktualisieren Sie das Front-End, um es benutzerfreundlicher zu machen.
  8. Teil Acht:Erstellen Sie eine benutzerdefinierte Angular-Direktive, um ein Häufigkeitsverteilungsdiagramm mit JavaScript und D3 anzuzeigen.

Benötigen Sie den Code? Holen Sie es aus dem Repo.


Installationsvoraussetzungen

Verwendete Tools:

  • Redis (5.0.7)
  • Python Redis (3.4.1)
  • RQ (1.2.2) - eine einfache Bibliothek zum Erstellen einer Aufgabenwarteschlange

Beginnen Sie mit dem Herunterladen und Installieren von Redis entweder von der offiziellen Website oder über Homebrew (brew install redis ). Starten Sie nach der Installation den Redis-Server:

$ redis-server

Als nächstes installieren Sie Python Redis und RQ in einem neuen Terminalfenster:

$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt


Worker einrichten

Beginnen wir damit, einen Arbeitsprozess zu erstellen, der auf Aufgaben in der Warteschlange wartet. Erstellen Sie eine neue Datei worker.py , und fügen Sie diesen Code hinzu:

import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Hier haben wir auf eine Warteschlange namens default gelauscht und eine Verbindung zum Redis-Server auf localhost:6379 hergestellt .

Starten Sie dies in einem anderen Terminalfenster:

$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...

Jetzt müssen wir unsere app.py aktualisieren um Aufträge an die Warteschlange zu senden…



Aktualisiere app.py

Fügen Sie die folgenden Importe zu app.py hinzu :

from rq import Queue
from rq.job import Job
from worker import conn

Aktualisieren Sie dann den Konfigurationsabschnitt:

app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn) Richten Sie eine Redis-Verbindung ein und initialisieren Sie eine Warteschlange basierend auf dieser Verbindung.

Verschieben Sie die Textverarbeitungsfunktionalität aus unserer Indexroute heraus und in eine neue Funktion namens count_and_save_words() . Diese Funktion akzeptiert ein Argument, eine URL, die wir ihr übergeben, wenn wir sie von unserer Indexroute aufrufen.

def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # this import solves a rq bug which currently exists
        from app import count_and_save_words

        # get url that the person has entered
        url = request.form['url']
        if not url[:8].startswith(('https://', 'http://')):
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

Beachten Sie den folgenden Code:

job = q.enqueue_call(
    func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())

Hinweis: Wir müssen die count_and_save_words importieren Funktion in unserer Funktion index da das RQ-Paket derzeit einen Fehler hat, bei dem es keine Funktionen im selben Modul findet.

Hier haben wir die zuvor initialisierte Warteschlange verwendet und enqueue_call() aufgerufen Funktion. Dadurch wurde der Warteschlange ein neuer Job hinzugefügt, und dieser Job führte count_and_save_words() aus Funktion mit der URL als Argument. Das result_ttl=5000 Das Zeilenargument teilt RQ mit, wie lange das Ergebnis des Jobs gespeichert werden soll - in diesem Fall 5.000 Sekunden. Dann haben wir die Job-ID an das Terminal ausgegeben. Diese ID wird benötigt, um zu sehen, ob die Verarbeitung des Jobs abgeschlossen ist.

Lassen Sie uns dafür eine neue Route einrichten …



Ergebnisse abrufen

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

Lassen Sie uns das testen.

Starten Sie den Server, navigieren Sie zu http://localhost:5000/, verwenden Sie die URL https://realpython.com und holen Sie sich die Job-ID vom Terminal. Verwenden Sie dann diese ID im Endpunkt „/results/“, d. h. http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.

Solange weniger als 5.000 Sekunden vergangen sind, bevor Sie den Status überprüfen, sollten Sie eine ID-Nummer sehen, die generiert wird, wenn wir die Ergebnisse zur Datenbank hinzufügen:

# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id

Lassen Sie uns nun die Route leicht umgestalten, um die tatsächlichen Ergebnisse aus der Datenbank in JSON zurückzugeben:

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202

Achten Sie darauf, den Import hinzuzufügen:

from flask import jsonify

Teste das nochmal aus. Wenn alles gut gegangen ist, sollten Sie in Ihrem Browser etwas Ähnliches sehen:

[
  [
    "Python", 
    315
  ], 
  [
    "intermediate", 
    167
  ], 
  [
    "python", 
    161
  ], 
  [
    "basics", 
    118
  ], 
  [
    "web-dev", 
    108
  ], 
  [
    "data-science", 
    51
  ], 
  [
    "best-practices", 
    49
  ], 
  [
    "advanced", 
    45
  ], 
  [
    "django", 
    43
  ], 
  [
    "flask", 
    41
  ]
]


Was kommt als Nächstes?

Kostenloser Bonus: Klicken Sie hier, um Zugang zu einem kostenlosen Flask + Python-Video-Tutorial zu erhalten, das Ihnen Schritt für Schritt zeigt, wie Sie eine Flask-Webanwendung erstellen.

In Teil 5 bringen wir Client und Server zusammen, indem wir Angular in die Mischung einfügen, um einen Poller zu erstellen, der alle fünf Sekunden eine Anfrage an /results/<job_key> sendet Endpunkt, der nach Updates fragt. Sobald die Daten verfügbar sind, fügen wir sie dem DOM hinzu.

Prost!

Dies ist eine Zusammenarbeit zwischen Cam Linke, Mitbegründer von Startup Edmonton, und den Leuten von Real Python