Ich führe regelmäßig pandas Operationen mit Datenrahmen aus, die mehr als 15 Millionen Zeilen umfassen, und ich hätte gerne Zugriff auf eine Fortschrittsanzeige für bestimmte Operationen.
Gibt es eine textbasierte Fortschrittsanzeige für pandas Split-Apply-Combine-Operationen?
Zum Beispiel in etwas wie:
df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)
woher feature_rollup
ist eine etwas komplizierte Funktion, die viele DF Spalten benötigt und neue Benutzerspalten mit verschiedenen Methoden erstellt. Diese Vorgänge können bei großen Datenrahmen eine Weile dauern, daher würde ich gerne wissen, ob dies der Fall ist Es ist möglich, eine textbasierte Ausgabe in einem iPython-Notizbuch zu haben, die mich über den Fortschritt informiert.
Bisher habe ich kanonische Loop-Fortschrittsanzeigen für Python versucht, aber sie interagieren nicht mit pandas in irgendeiner sinnvollen Weise.
Ich hoffe, dass es etwas gibt, das ich in der pandas= library/documentation übersehen habe, das es einem ermöglicht, den Fortschritt eines Split-Apply-Mähdreschers zu kennen. Eine einfache Implementierung würde sich vielleicht auf die Gesamtzahl beziehen von Datenrahmen-Teilmengen, für die die Funktion apply
ausgeführt wird, und geben den Fortschritt als den vollständigen Bruchteil dieser Teilmengen an.
Ist dies möglicherweise etwas, das der Bibliothek hinzugefügt werden muss?
Aufgrund der großen Nachfrage hat tqdm
Unterstützung für pandas
hinzugefügt. Im Gegensatz zu den anderen Antworten verlangsamt sich dies nicht merklich pandas down - hier ist ein Beispiel für DataFrameGroupBy.progress_apply
:
import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm # for notebooks
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()
# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)
Wenn Sie daran interessiert sind, wie dies funktioniert (und wie Sie es für Ihre eigenen Rückrufe ändern können), lesen Sie die Beispiele für github , die vollständige Dokumentation zu pypi oder den Import das Modul und führen Sie help(tqdm)
aus.
[~ # ~] edit [~ # ~]
Um die ursprüngliche Frage direkt zu beantworten, ersetzen Sie:
df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)
mit:
from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)
Hinweis: tqdm <= v4.8 : Für Versionen von tqdm unter 4.8 mussten Sie anstelle von tqdm.pandas()
Folgendes tun:
from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
Auf die Antwort von Tweak Jeff (und dies als wiederverwendbare Funktion).
def logged_apply(g, func, *args, **kwargs):
step_percentage = 100. / len(g)
import sys
sys.stdout.write('apply progress: 0%')
sys.stdout.flush()
def logging_decorator(func):
def wrapper(*args, **kwargs):
progress = wrapper.count * step_percentage
sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
sys.stdout.flush()
wrapper.count += 1
return func(*args, **kwargs)
wrapper.count = 0
return wrapper
logged_func = logging_decorator(func)
res = g.apply(logged_func, *args, **kwargs)
sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
sys.stdout.flush()
return res
Hinweis: Der Prozentsatz für den Anwendungsfortschritt pdates inline . Wenn Ihre Funktion nicht funktioniert, funktioniert dies nicht.
In [11]: g = df_users.groupby(['userID', 'requestDate'])
In [12]: f = feature_rollup
In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]:
...
Wie üblich können Sie dies als Methode zu Ihren Gruppenobjekten hinzufügen:
from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply
In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]:
...
Wie in den Kommentaren erwähnt, ist dies kein Feature, an dessen Implementierung Core pandas interessiert wäre. Aber python Damit können Sie diese für viele pandas Objekte/Methoden erstellen (dies wäre ein ziemlicher Arbeitsaufwand ... obwohl Sie diesen Ansatz verallgemeinern können sollten).
Falls Sie Unterstützung für die Verwendung in einem Jupyter/ipython-Notizbuch benötigen, wie ich es getan habe, finden Sie hier eine hilfreiche Anleitung und Quelle für relevanter Artikel :
from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)
Beachten Sie den Unterstrich in der import-Anweisung für _tqdm_notebook
. Wie in dem genannten Artikel erwähnt, befindet sich die Entwicklung in der späten Beta-Phase.
Sie können dies leicht mit einem Dekorateur tun
from functools import wraps
def logging_decorator(func):
@wraps
def wrapper(*args, **kwargs):
wrapper.count += 1
print "The function I modify has been called {0} times(s).".format(
wrapper.count)
func(*args, **kwargs)
wrapper.count = 0
return wrapper
modified_function = logging_decorator(feature_rollup)
dann benutze einfach die modified_function (und ändere wann du es drucken willst)
Für alle, die tqdm auf ihren benutzerdefinierten, parallelen Pandas-Apply-Code anwenden möchten.
(Im Laufe der Jahre habe ich einige Bibliotheken für die Parallelisierung ausprobiert, aber ich habe nie eine 100% ige Parallelisierungslösung gefunden, hauptsächlich für die Apply-Funktion, und ich musste immer wieder meinen "manuellen" Code anfordern.)
df_multi_core - das ist das, was du nennst. Es akzeptiert:
_ df_split - Dies ist eine interne Hilfsfunktion, die global zum laufenden Modul positioniert werden muss (Pool.map ist "Placement-abhängig"). d lokalisieren es intern ..
hier ist der Code aus meinem Gist (Ich werde dort weitere pandas Funktionstests hinzufügen):
import pandas as pd
import numpy as np
import multiprocessing
from functools import partial
def _df_split(tup_arg, **kwargs):
split_ind, df_split, df_f_name = tup_arg
return (split_ind, getattr(df_split, df_f_name)(**kwargs))
def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
if njobs == -1:
njobs = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=njobs)
try:
splits = np.array_split(df[subset], njobs)
except ValueError:
splits = np.array_split(df, njobs)
pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
results = pool.map(partial(_df_split, **kwargs), pool_data)
pool.close()
pool.join()
results = sorted(results, key=lambda x:x[0])
results = pd.concat([split[1] for split in results])
return results
Bellow ist ein Testcode für eine parallelisierte anwenden mit tqtm "progress_apply".
from time import time
from tqdm import tqdm
tqdm.pandas()
if __== '__main__':
sep = '-' * 50
# tqdm progress_apply test
def apply_f(row):
return row['c1'] + 0.1
N = 1000000
np.random.seed(0)
df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})
print('testing pandas apply on {}\n{}'.format(df.shape, sep))
t1 = time()
res = df.progress_apply(apply_f, axis=1)
t2 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))
t3 = time()
# res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
t4 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))
In der Ausgabe wird 1 Fortschrittsbalken für die Ausführung ohne Parallelisierung und Pro-Core-Fortschrittsbalken für die Ausführung mit Parallelisierung angezeigt. Es gibt ein leichtes Hickup und manchmal erscheint der Rest der Kerne sofort, aber selbst dann halte ich es für nützlich, da Sie die Fortschrittsstatistiken pro Kern (it/sec und Gesamtaufzeichnungen, zum Beispiel) erhalten.
Vielen Dank an @abcdaa für diese großartige Bibliothek!
Ich habe Jeffs Antwort geändert, um eine Gesamtsumme einzuschließen, damit Sie den Fortschritt und eine Variable verfolgen können, um nur alle X-Iterationen zu drucken (dies verbessert tatsächlich die Leistung um ein Vielfaches, wenn die "print_at" ist ziemlich hoch)
def count_wrapper(func,total, print_at):
def wrapper(*args):
wrapper.count += 1
if wrapper.count % wrapper.print_at == 0:
clear_output()
sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
sys.stdout.flush()
return func(*args)
wrapper.count = 0
wrapper.total = total
wrapper.print_at = print_at
return wrapper
die Funktion clear_output () ist von
from IPython.core.display import clear_output
wenn nicht auf IPython, tut Andy Haydens Antwort das ohne es