webentwicklung-frage-antwort-db.com.de

Python Asyncio - RuntimeError: Eine laufende Ereignisschleife kann nicht geschlossen werden

Ich versuche, diesen Fehler zu beheben: RuntimeError: Cannot close a running event loop in meinem Asyncio-Prozess. Ich glaube, dass dies passiert, weil ein Fehler auftritt, während Aufgaben noch ausstehen, und ich versuche, die Ereignisschleife zu schließen. Ich denke, ich muss die verbleibenden Antworten abwarten, bevor ich die Ereignisschleife schließe, aber ich bin mir nicht sicher, wie ich das in meiner spezifischen Situation richtig erreichen kann.

 def start_job(self):

        if self.auth_expire_timestamp < get_timestamp():
            api_obj = api_handler.Api('Api Name', self.dbObj)
            self.api_auth_resp = api_obj.get_auth_response()
            self.api_attr = api_obj.get_attributes()


        try:
            self.queue_manager(self.do_stuff(json_data))
        except aiohttp.ServerDisconnectedError as e:
            logging.info("Reconnecting...")
            api_obj = api_handler.Api('API Name', self.dbObj)
            self.api_auth_resp = api_obj.get_auth_response()
            self.api_attr = api_obj.get_attributes()
            self.run_eligibility()

async def do_stuff(self, data):

    tasks = []

    async with aiohttp.ClientSession() as session:
        for row in data:
            task = asyncio.ensure_future(self.async_post('url', session, row))
            tasks.append(task)
        result = await asyncio.gather(*tasks)
    self.load_results(result)


def queue_manager(self, method):
    self.loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(method)
    self.loop.run_until_complete(future)


async def async_post(self, resource, session, data):
        async with session.post(self.api_attr.api_endpoint + resource, headers=self.headers, data=data) as response:
            resp = []
            try:
                headers = response.headers['foo']
                content = await response.read()
                resp.append(headers)
                resp.append(content)
            except KeyError as e:
                logging.error('KeyError at async_post response')
                logging.error(e)
        return resp


def shutdown(self):
    //need to do something here to await the remaining tasks and then I need to re-start a new event loop, which i think i can do, just don't know how to appropriately stop the current one.
    self.loop.close() 
    return True

Wie kann ich mit dem Fehler umgehen und die Ereignisschleife ordnungsgemäß schließen, sodass ich eine neue starten und im Wesentlichen das gesamte Programm neu starten und fortfahren kann.

EDIT:

Das versuche ich jetzt, basierend auf dieser SO Antwort . Leider tritt dieser Fehler nur selten auf. Wenn ich ihn nicht erzwingen kann, muss ich abwarten, ob er funktioniert. In meiner queue_manager-Methode habe ich es folgendermaßen geändert:

try:
   self.loop.run_until_complete(future)
except Exception as e:
   future.cancel()
   self.loop.run_until_complete(future)
   future.exception()

UPDATE:

Ich wurde die shutdown()-Methode los und fügte dies stattdessen meiner queue_manager()-Methode hinzu und es scheint ohne Probleme zu funktionieren: 

  try:
        self.loop.run_until_complete(future)
    except Exception as e:
        future.cancel()
        self.check_in_records()
        self.reconnect()
        self.start_job()
        future.exception()
3
hyphen

Um die Frage wie ursprünglich angegeben zu beantworten, muss keine close()-Schleife ausgeführt werden. Sie können dieselbe Schleife für das gesamte Programm wiederverwenden.

In Anbetracht des Codes im Update könnte Ihr queue_manager folgendermaßen aussehen:

try:
    self.loop.run_until_complete(future)
except Exception as e:
    self.check_in_records()
    self.reconnect()
    self.start_job()

Das Abbrechen von future ist nicht erforderlich und hat, soweit ich das beurteilen kann, keine Auswirkungen. Dies unterscheidet sich von referenzierte Antwort , das speziell auf KeyboardInterrupt reagiert, da es von asyncio selbst ausgelöst wird. KeyboardInterrupt kann von run_until_complete weitergegeben werden, ohne dass die Zukunft tatsächlich abgeschlossen ist. Handhabung Ctrl-C richtig in asyncio ist sehr schwer oder sogar unmöglich (siehe hier für Details), aber zum Glück geht es nicht um die Frage Ctrl-C es geht überhaupt um ausnahmen, die von der coroutine gemacht werden. (Beachten Sie, dass KeyboardInterrupt nicht von Exception erbt, also im Fall von Ctrl-C Der Except Body wird nicht einmal ausgeführt.)

Ich habe die Zukunft abgebrochen, weil in diesem Fall noch Aufgaben ausstehen und ich diese Aufgaben im Wesentlichen entfernen und eine neue Ereignisschleife starten möchte.

Dies ist richtig, aber der Code in der (aktualisierten) Frage storniert nur eine einzelne Zukunft, die bereits an run_until_complete übergeben wurde. Denken Sie daran, dass eine Zukunft ein Platzhalter für einen Ergebniswert ist, der zu einem späteren Zeitpunkt bereitgestellt wird. Sobald der Wert bereitgestellt wurde, kann er durch Aufrufen von future.result() abgerufen werden. Wenn der "Wert" der Zukunft eine Ausnahme ist, löst future.result() diese Ausnahme aus. run_until_complete hat den Vertrag, dass die Ereignisschleife so lange ausgeführt wird, wie es für die angegebene Zukunft erforderlich ist, einen Wert zu erzeugen, und gibt diesen Wert dann zurück. Wenn der "Wert" tatsächlich eine Ausnahme darstellt, die ausgelöst werden muss, wird er von run_until_complete erneut ausgelöst. Zum Beispiel:

loop = asyncio.get_event_loop()
fut = loop.create_future()
loop.call_soon(fut.set_exception, ZeroDivisionError)
# raises ZeroDivisionError, as that is the future's result,
# manually set
loop.run_until_complete(fut)

Wenn die fragliche Zukunft tatsächlich ein Task ist, ein asynchrones Objekt, das eine Coroutine in eine Future einwickelt, ist das Ergebnis einer solchen Zukunft das von der Coroutine zurückgegebene Objekt. Wenn die Coroutine eine Ausnahme auslöst, wird sie beim Abrufen des Ergebnisses erneut ausgelöst, und auch run_until_complete:

async def fail():
    1/0

loop = asyncio.get_event_loop()
fut = loop.create_task(fail())
# raises ZeroDivisionError, as that is the future's result,
# because the coroutine raises it
loop.run_until_complete(fut)

Bei der Bearbeitung einer Aufgabe bedeutet run_until_complete, dass die Coroutine ebenfalls beendet wurde und entweder einen Wert zurückgegeben oder eine Ausnahme ausgelöst hat, wie durch run_until_complete bestimmt.

Das Abbrechen einer Aufgabe bewirkt andererseits, dass die Aufgabe fortgesetzt wird und der Ausdruck await, der sie angehalten hat, CancelledError auslöst. Wenn die Task diese Ausnahme nicht speziell abfängt und unterdrückt (was von gut verhaltenem Asynchroncode nicht erwartet wird), wird die Task nicht mehr ausgeführt und das Ergebnis ist CancelledError. Wenn die Coroutine jedoch bereits beendet ist, wenn cancel() aufgerufen wird, kann cancel() nichts tun, da es kein await gibt, in das CancelledError injiziert werden kann.

1
user4815162342