Denne del af selvstudiet beskriver, hvordan man implementerer en Redis-opgavekø til at håndtere tekstbehandling.
Opdateringer:
- 02/12/2020:Opgraderet til Python version 3.8.1 samt de nyeste versioner af Redis, Python Redis og RQ. Se nedenfor for detaljer. Nævn en fejl i den seneste RQ-version og giv en løsning. Løste http før https-fejlen.
- 22/03/2016:Opgraderet til Python version 3.5.1 samt de seneste versioner af Redis, Python Redis og RQ. Se nedenfor for detaljer.
- 22/02/2015:Tilføjet Python 3-understøttelse.
Gratis bonus: Klik her for at få adgang til en gratis Flask + Python-videovejledning, der viser dig, hvordan du bygger Flask-webapp, trin-for-trin.
Husk:Her er, hvad vi bygger – En Flask-app, der beregner ord-frekvenspar baseret på teksten fra en given URL.
- Del 1:Opsæt et lokalt udviklingsmiljø og implementer derefter både et iscenesættelses- og et produktionsmiljø på Heroku.
- Del to:Opsæt en PostgreSQL-database sammen med SQLAlchemy og Alembic til at håndtere migreringer.
- Del tre:Tilføj back-end-logikken for at skrabe og bearbejd derefter ordantallet fra en webside ved hjælp af anmodninger, BeautifulSoup og Natural Language Toolkit (NLTK) bibliotekerne.
- Fjerde del:Implementer en Redis-opgavekø til at håndtere tekstbehandlingen. (aktuel )
- Femte del:Indstil Angular på front-end for løbende at polle back-end for at se, om anmodningen er færdigbehandlet.
- Del seks:Skub til iscenesættelsesserveren på Heroku - opsætning af Redis og detaljer om, hvordan du kører to processer (web og worker) på en enkelt Dyno.
- Syvende del:Opdater front-end for at gøre den mere brugervenlig.
- Del otte:Opret et tilpasset vinkeldirektiv for at vise et frekvensfordelingsdiagram ved hjælp af JavaScript og D3.
Har du brug for koden? Få fat i det fra reposen.
Installationskrav
Brugte værktøjer:
- Redis (5.0.7)
- Python Redis (3.4.1)
- RQ (1.2.2) - et simpelt bibliotek til oprettelse af en opgavekø
Start med at downloade og installere Redis fra enten det officielle websted eller via Homebrew (brew install redis
). Når den er installeret, skal du starte Redis-serveren:
$ redis-server
Installer derefter Python Redis og RQ i et nyt terminalvindue:
$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt
Konfigurer Worker
Lad os starte med at oprette en arbejdsproces for at lytte efter opgaver i kø. Opret en ny fil worker.py , og tilføj denne kode:
import os
import redis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
Her lyttede vi efter en kø kaldet default
og etablerede en forbindelse til Redis-serveren på localhost:6379
.
Start dette op i et andet terminalvindue:
$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...
Nu skal vi opdatere vores app.py at sende job til køen...
Opdater app.py
Tilføj følgende importer til app.py :
from rq import Queue
from rq.job import Job
from worker import conn
Opdater derefter konfigurationsafsnittet:
app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
q = Queue(connection=conn)
from models import *
q = Queue(connection=conn)
oprettet en Redis-forbindelse og initialiseret en kø baseret på den forbindelse.
Flyt tekstbehandlingsfunktionen ud af vores indeksrute og ind i en ny funktion kaldet count_and_save_words()
. Denne funktion accepterer ét argument, en URL, som vi sender til den, når vi kalder den fra vores indeksrute.
def count_and_save_words(url):
errors = []
try:
r = requests.get(url)
except:
errors.append(
"Unable to get URL. Please make sure it's valid and try again."
)
return {"error": errors}
# text processing
raw = BeautifulSoup(r.text).get_text()
nltk.data.path.append('./nltk_data/') # set the path
tokens = nltk.word_tokenize(raw)
text = nltk.Text(tokens)
# remove punctuation, count raw words
nonPunct = re.compile('.*[A-Za-z].*')
raw_words = [w for w in text if nonPunct.match(w)]
raw_word_count = Counter(raw_words)
# stop words
no_stop_words = [w for w in raw_words if w.lower() not in stops]
no_stop_words_count = Counter(no_stop_words)
# save the results
try:
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
except:
errors.append("Unable to add item to database.")
return {"error": errors}
@app.route('/', methods=['GET', 'POST'])
def index():
results = {}
if request.method == "POST":
# this import solves a rq bug which currently exists
from app import count_and_save_words
# get url that the person has entered
url = request.form['url']
if not url[:8].startswith(('https://', 'http://')):
url = 'http://' + url
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
return render_template('index.html', results=results)
Bemærk følgende kode:
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
Bemærk: Vi skal importere count_and_save_words
funktion i vores funktion index
da RQ-pakken i øjeblikket har en fejl, hvor den ikke finder funktioner i det samme modul.
Her brugte vi køen, som vi initialiserede tidligere og kaldte enqueue_call()
fungere. Dette tilføjede et nyt job til køen, og det job kørte count_and_save_words()
funktion med URL'en som argument. result_ttl=5000
linjeargument fortæller RQ, hvor længe man skal holde på resultatet af jobbet i - 5.000 sekunder, i dette tilfælde. Så udsendte vi job-id'et til terminalen. Dette id er nødvendigt for at se, om jobbet er færdigbehandlet.
Lad os konfigurere en ny rute til det...
Få resultater
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
return str(job.result), 200
else:
return "Nay!", 202
Lad os teste dette af.
Tænd serveren, naviger til http://localhost:5000/, brug URL'en https://realpython.com, og tag job-id'et fra terminalen. Brug derefter dette id i '/results/'-slutpunktet - dvs. http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.
Så længe der er gået mindre end 5.000 sekunder, før du tjekker status, så skulle du se et id-nummer, som genereres, når vi tilføjer resultaterne til databasen:
# save the results
try:
from models import Result
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
Lad os nu omstrukturere ruten lidt for at returnere de faktiske resultater fra databasen i JSON:
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
result = Result.query.filter_by(id=job.result).first()
results = sorted(
result.result_no_stop_words.items(),
key=operator.itemgetter(1),
reverse=True
)[:10]
return jsonify(results)
else:
return "Nay!", 202
Sørg for at tilføje importen:
from flask import jsonify
Test dette igen. Hvis alt gik godt, skulle du se noget lignende i din browser:
[
[
"Python",
315
],
[
"intermediate",
167
],
[
"python",
161
],
[
"basics",
118
],
[
"web-dev",
108
],
[
"data-science",
51
],
[
"best-practices",
49
],
[
"advanced",
45
],
[
"django",
43
],
[
"flask",
41
]
]
Hvad er det næste?
Gratis bonus: Klik her for at få adgang til en gratis Flask + Python-videovejledning, der viser dig, hvordan du bygger Flask-webapp, trin-for-trin.
I del 5 bringer vi klienten og serveren sammen ved at tilføje Angular til blandingen for at skabe en poller, som sender en anmodning hvert femte sekund til /results/<job_key>
slutpunkt, der beder om opdateringer. Når dataene er tilgængelige, tilføjer vi dem til DOM.
Skål!
Dette er et samarbejde mellem Cam Linke, medstifter af Startup Edmonton, og folkene hos Real Python