Mein aktueller Java/Spark-Unit-Test-Ansatz funktioniert (detailliert hier ), indem ein SparkContext mit "local" instanziiert und Unit-Tests mit JUnit ausgeführt werden.
Der Code muss so organisiert sein, dass er E/A in einer Funktion ausführt und eine andere mit mehreren RDDs aufruft.
Das funktioniert super. Ich habe eine sehr getestete Datenumwandlung in Java + Spark.
Kann ich dasselbe mit Python machen?
Wie würde ich Spark Unit-Tests mit Python ausführen?
Ich würde auch die Verwendung von py.test empfehlen. py.test macht es einfach, wiederverwendbare SparkContext-Test-Fixtures zu erstellen und damit prägnante Testfunktionen zu schreiben. Sie können Fixtures auch spezialisieren (zum Beispiel um einen StreamingContext zu erstellen) und einen oder mehrere davon in Ihren Tests verwenden.
Ich habe zu diesem Thema einen Blogbeitrag über Medium geschrieben:
https://engblog.nextdoor.com/unit-testing-Apache-spark-with-py-test-3b8970dc013b
Hier ist ein Ausschnitt aus dem Beitrag:
pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_Word_counts(spark_context):
""" test Word couting
Args:
spark_context: test fixture SparkContext
"""
test_input = [
' hello spark ',
' hello again spark spark'
]
input_rdd = spark_context.parallelize(test_input, 1)
results = wordcount.do_Word_counts(input_rdd)
expected_results = {'hello':2, 'spark':3, 'again':1}
assert results == expected_results
Hier ist eine Lösung mit pytest, wenn Sie Spark 2.x und SparkSession
verwenden. Ich importiere auch ein Paket eines Drittanbieters.
import logging
import pytest
from pyspark.sql import SparkSession
def quiet_py4j():
"""Suppress spark logging for the test context."""
logger = logging.getLogger('py4j')
logger.setLevel(logging.WARN)
@pytest.fixture(scope="session")
def spark_session(request):
"""Fixture for creating a spark context."""
spark = (SparkSession
.builder
.master('local[2]')
.config('spark.jars.packages', 'com.databricks:spark-avro_2.11:3.0.1')
.appName('pytest-pyspark-local-testing')
.enableHiveSupport()
.getOrCreate())
request.addfinalizer(lambda: spark.stop())
quiet_py4j()
return spark
def test_my_app(spark_session):
...
Beachten Sie, dass ich bei Verwendung von Python 3 Folgendes als PYSPARK_PYTHON-Umgebungsvariable angeben musste:
import os
import sys
IS_PY2 = sys.version_info < (3,)
if not IS_PY2:
os.environ['PYSPARK_PYTHON'] = 'python3'
Andernfalls erhalten Sie den Fehler:
Ausnahme: Python in Worker hat eine andere Version 2.7 als die in Treiber 3.5, PySpark kann nicht mit verschiedenen Nebenversionen ausgeführt werden. Bitte überprüfen Sie, ob die Umgebungsvariablen PYSPARK_PYTHON und PYSPARK_DRIVER_PYTHON korrekt eingestellt sind.
Angenommen, Sie haben pyspark
installiert, können Sie die folgende Klasse für unitTest in unittest
verwenden:
import unittest
import pyspark
class PySparkTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
conf = pyspark.SparkConf().setMaster("local[2]").setAppName("testing")
cls.sc = pyspark.SparkContext(conf=conf)
cls.spark = pyspark.SQLContext(cls.sc)
@classmethod
def tearDownClass(cls):
cls.sc.stop()
Beispiel:
class SimpleTestCase(PySparkTestCase):
def test_with_rdd(self):
test_input = [
' hello spark ',
' hello again spark spark'
]
input_rdd = self.sc.parallelize(test_input, 1)
from operator import add
results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect()
self.assertEqual(results, [('hello', 2), ('spark', 3), ('again', 1)])
def test_with_df(self):
df = self.spark.createDataFrame(data=[[1, 'a'], [2, 'b']],
schema=['c1', 'c2'])
self.assertEqual(df.count(), 2)
Beachten Sie, dass dadurch ein Kontext pro Klasse erstellt wird. Verwenden Sie setUp
anstelle von setUpClass
, um einen Kontext pro Test zu erhalten. Dies erhöht in der Regel den Zeitaufwand für die Ausführung der Tests, da das Erstellen eines neuen spark context derzeit teuer ist.
Ich verwende pytest
, mit dem Sie Test-Fixtures erstellen können, um einen Pyspark-Kontext zu instanziieren und in all Ihre Tests zu integrieren, die dies erfordern. Etwas in der Art von
@pytest.fixture(scope="session",
params=[pytest.mark.spark_local('local'),
pytest.mark.spark_yarn('yarn')])
def spark_context(request):
if request.param == 'local':
conf = (SparkConf()
.setMaster("local[2]")
.setAppName("pytest-pyspark-local-testing")
)
Elif request.param == 'yarn':
conf = (SparkConf()
.setMaster("yarn-client")
.setAppName("pytest-pyspark-yarn-testing")
.set("spark.executor.memory", "1g")
.set("spark.executor.instances", 2)
)
request.addfinalizer(lambda: sc.stop())
sc = SparkContext(conf=conf)
return sc
def my_test_that_requires_sc(spark_context):
assert spark_context.textFile('/path/to/a/file').count() == 10
Anschließend können Sie die Tests im lokalen Modus ausführen, indem Sie py.test -m spark_local
Oder in YARN mit py.test -m spark_yarn
Aufrufen. Das hat bei mir ganz gut geklappt.
Vor einiger Zeit war ich auch mit dem gleichen Problem konfrontiert und nach dem Lesen mehrerer Artikel, Foren und einiger StackOverflow-Antworten habe ich ein kleines Plugin für pytest geschrieben: pytest-spark
Ich benutze es bereits seit einigen Monaten und der allgemeine Workflow sieht unter Linux gut aus: