webentwicklung-frage-antwort-db.com.de

Fortschrittsanzeige während pandas Operationen

Ich führe regelmäßig pandas Operationen mit Datenrahmen aus, die mehr als 15 Millionen Zeilen umfassen, und ich hätte gerne Zugriff auf eine Fortschrittsanzeige für bestimmte Operationen.

Gibt es eine textbasierte Fortschrittsanzeige für pandas Split-Apply-Combine-Operationen?

Zum Beispiel in etwas wie:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

woher feature_rollup ist eine etwas komplizierte Funktion, die viele DF Spalten benötigt und neue Benutzerspalten mit verschiedenen Methoden erstellt. Diese Vorgänge können bei großen Datenrahmen eine Weile dauern, daher würde ich gerne wissen, ob dies der Fall ist Es ist möglich, eine textbasierte Ausgabe in einem iPython-Notizbuch zu haben, die mich über den Fortschritt informiert.

Bisher habe ich kanonische Loop-Fortschrittsanzeigen für Python versucht, aber sie interagieren nicht mit pandas in irgendeiner sinnvollen Weise.

Ich hoffe, dass es etwas gibt, das ich in der pandas= library/documentation übersehen habe, das es einem ermöglicht, den Fortschritt eines Split-Apply-Mähdreschers zu kennen. Eine einfache Implementierung würde sich vielleicht auf die Gesamtzahl beziehen von Datenrahmen-Teilmengen, für die die Funktion apply ausgeführt wird, und geben den Fortschritt als den vollständigen Bruchteil dieser Teilmengen an.

Ist dies möglicherweise etwas, das der Bibliothek hinzugefügt werden muss?

97
cwharland

Aufgrund der großen Nachfrage hat tqdm Unterstützung für pandas hinzugefügt. Im Gegensatz zu den anderen Antworten verlangsamt sich dies nicht merklich pandas down - hier ist ein Beispiel für DataFrameGroupBy.progress_apply:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

Wenn Sie daran interessiert sind, wie dies funktioniert (und wie Sie es für Ihre eigenen Rückrufe ändern können), lesen Sie die Beispiele für github , die vollständige Dokumentation zu pypi oder den Import das Modul und führen Sie help(tqdm) aus.

[~ # ~] edit [~ # ~]


Um die ursprüngliche Frage direkt zu beantworten, ersetzen Sie:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

mit:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

Hinweis: tqdm <= v4.8 : Für Versionen von tqdm unter 4.8 mussten Sie anstelle von tqdm.pandas() Folgendes tun:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
172
casper.dcl

Auf die Antwort von Tweak Jeff (und dies als wiederverwendbare Funktion).

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

Hinweis: Der Prozentsatz für den Anwendungsfortschritt pdates inline . Wenn Ihre Funktion nicht funktioniert, funktioniert dies nicht.

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

Wie üblich können Sie dies als Methode zu Ihren Gruppenobjekten hinzufügen:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

Wie in den Kommentaren erwähnt, ist dies kein Feature, an dessen Implementierung Core pandas interessiert wäre. Aber python Damit können Sie diese für viele pandas Objekte/Methoden erstellen (dies wäre ein ziemlicher Arbeitsaufwand ... obwohl Sie diesen Ansatz verallgemeinern können sollten).

13
Andy Hayden

Falls Sie Unterstützung für die Verwendung in einem Jupyter/ipython-Notizbuch benötigen, wie ich es getan habe, finden Sie hier eine hilfreiche Anleitung und Quelle für relevanter Artikel :

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

Beachten Sie den Unterstrich in der import-Anweisung für _tqdm_notebook. Wie in dem genannten Artikel erwähnt, befindet sich die Entwicklung in der späten Beta-Phase.

7
Victor Vulovic

Sie können dies leicht mit einem Dekorateur tun

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

dann benutze einfach die modified_function (und ändere wann du es drucken willst)

4
Jeff

Für alle, die tqdm auf ihren benutzerdefinierten, parallelen Pandas-Apply-Code anwenden möchten.

(Im Laufe der Jahre habe ich einige Bibliotheken für die Parallelisierung ausprobiert, aber ich habe nie eine 100% ige Parallelisierungslösung gefunden, hauptsächlich für die Apply-Funktion, und ich musste immer wieder meinen "manuellen" Code anfordern.)

df_multi_core - das ist das, was du nennst. Es akzeptiert:

  1. Dein df Objekt
  2. Der Funktionsname, den Sie aufrufen möchten
  3. Die Teilmenge der Spalten, für die die Funktion ausgeführt werden kann (hilft, Zeit/Speicher zu reduzieren)
  4. Die Anzahl der Jobs, die parallel ausgeführt werden sollen (-1 oder für alle Kerne weggelassen)
  5. Alle anderen kwargs, die die df-Funktion akzeptiert (wie "axis")

_ df_split - Dies ist eine interne Hilfsfunktion, die global zum laufenden Modul positioniert werden muss (Pool.map ist "Placement-abhängig"). d lokalisieren es intern ..

hier ist der Code aus meinem Gist (Ich werde dort weitere pandas Funktionstests hinzufügen):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Bellow ist ein Testcode für eine parallelisierte anwenden mit tqtm "progress_apply".

from time import time
from tqdm import tqdm
tqdm.pandas()

if __== '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

In der Ausgabe wird 1 Fortschrittsbalken für die Ausführung ohne Parallelisierung und Pro-Core-Fortschrittsbalken für die Ausführung mit Parallelisierung angezeigt. Es gibt ein leichtes Hickup und manchmal erscheint der Rest der Kerne sofort, aber selbst dann halte ich es für nützlich, da Sie die Fortschrittsstatistiken pro Kern (it/sec und Gesamtaufzeichnungen, zum Beispiel) erhalten.

enter image description here

Vielen Dank an @abcdaa für diese großartige Bibliothek!

2
mork

Ich habe Jeffs Antwort geändert, um eine Gesamtsumme einzuschließen, damit Sie den Fortschritt und eine Variable verfolgen können, um nur alle X-Iterationen zu drucken (dies verbessert tatsächlich die Leistung um ein Vielfaches, wenn die "print_at" ist ziemlich hoch)

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

die Funktion clear_output () ist von

from IPython.core.display import clear_output

wenn nicht auf IPython, tut Andy Haydens Antwort das ohne es

0
Filipe Silva