Testing ETL with Spark using Pytest

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Testing ETL with Spark using Pytest

Mich Talebzadeh
I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.

In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.

The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!

Example  ## This is correct @pytest.fixture(scope = "session") def transformData(readSourceData):  ## fixture passed as parameter # this is incorrect (cannot call a fixture in another fixture) read_df = readSourceData()  So This operation becomes 

 transformation_df = readSourceData. \ select( \ ....

Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example

### file --> saveData.py @pytest.fixture(scope = "session") def saveData(transformData): # Write to test target table try: transformData. \ write. \ format("jdbc"). \ ....


You then drive this test by creating a file called conftest.py under tests package. You can then instantiate  your fixture files by referencing them in this file as below

import pytest from tests.fixtures.extractHiveData import extractHiveData from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable from tests.fixtures.readSavedData import readSavedData from tests.fixtures.readSourceData import readSourceData from tests.fixtures.transformData import transformData from tests.fixtures.saveData import saveData from tests.fixtures.readSavedData import readSavedData

Then you have your test Python file say test_oracle.py under package tests and then put assertions there

import pytest from src.config import ctest @pytest.mark.usefixtures("extractHiveData") def test_extract(extractHiveData): assert extractHiveData.count() > 0 @pytest.mark.usefixtures("loadIntoMysqlTable") def test_loadIntoMysqlTable(loadIntoMysqlTable): assert loadIntoMysqlTable @pytest.mark.usefixtures("readSavedData") def test_readSourceData(readSourceData): assert readSourceData.count() == ctest['statics']['read_df_rows'] @pytest.mark.usefixtures("transformData") def test_transformData(transformData): assert transformData.count() == ctest['statics']['transformation_df_rows'] @pytest.mark.usefixtures("saveData") def test_saveData(saveData): assert saveData
@pytest.mark.usefixtures("readSavedData")
def test_readSavedData(transformData, readSavedData): assert readSavedData.subtract(transformData).count() == 0

This is an illustration from PyCharm about directory structure unders tests


image.png


Let me know your thoughts.


Cheers,


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 


Reply | Threaded
Open this post in threaded view
|

Re: Testing ETL with Spark using Pytest

Jerry Vinokurov
Hi Mich,

I'm a bit confused by what you mean when you say that you cannot call a fixture in another fixture. The fixtures resolve dependencies among themselves by means of their named parameters. So that means that if I have a fixture

@pytest.fixture
def fixture1():
    return SomeObj()

and another fixture

@pytest.fixture
def fixture2(fixture1)
    return do_something_with_obj(fixture1)

my second fixture will simply receive the object created by the first. As such, you do not need to "call" the second fixture at all. Of course, if you had some use case where you were constructing an object in the second fixture, you could have the first return a class, or you could have it return a function. In fact, I have fixtures in a project that do both. Here they are:

@pytest.fixture
def func():

def foo(x, y, z):

return (x + y) * z

return foo
That's a fixture that returns a function, and any test using the func fixture would receive that actual function as a value, which could then be invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more like what you're doing:


@pytest.fixture
def data_frame():

return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)], columns=['x', 'y', 'z'])
This one just returns a data frame that can be operated on.

Looking at your setup, I don't want to say that it's wrong per se (it could be very appropriate to your specific project to split things up among these many files) but I would say that it's not idiomatic usage of pytest fixtures, in my experience. It feels to me like you're jumping through a lot of hoops to set up something that could be done quite easily and compactly in conftest.py. I do want to emphasize that there is no limitation on how fixtures can be used within functions or within other fixtures (which are also just functions), since the result of the fixture call is just some Python object.

Hope this helps,
Jerry

On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh <[hidden email]> wrote:
I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.

In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.

The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!

Example  ## This is correct @pytest.fixture(scope = "session") def transformData(readSourceData):  ## fixture passed as parameter # this is incorrect (cannot call a fixture in another fixture) read_df = readSourceData()  So This operation becomes 

 transformation_df = readSourceData. \ select( \ ....

Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example

### file --> saveData.py @pytest.fixture(scope = "session") def saveData(transformData): # Write to test target table try: transformData. \ write. \ format("jdbc"). \ ....


You then drive this test by creating a file called conftest.py under tests package. You can then instantiate  your fixture files by referencing them in this file as below

import pytest from tests.fixtures.extractHiveData import extractHiveData from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable from tests.fixtures.readSavedData import readSavedData from tests.fixtures.readSourceData import readSourceData from tests.fixtures.transformData import transformData from tests.fixtures.saveData import saveData from tests.fixtures.readSavedData import readSavedData

Then you have your test Python file say test_oracle.py under package tests and then put assertions there

import pytest from src.config import ctest @pytest.mark.usefixtures("extractHiveData") def test_extract(extractHiveData): assert extractHiveData.count() > 0 @pytest.mark.usefixtures("loadIntoMysqlTable") def test_loadIntoMysqlTable(loadIntoMysqlTable): assert loadIntoMysqlTable @pytest.mark.usefixtures("readSavedData") def test_readSourceData(readSourceData): assert readSourceData.count() == ctest['statics']['read_df_rows'] @pytest.mark.usefixtures("transformData") def test_transformData(transformData): assert transformData.count() == ctest['statics']['transformation_df_rows'] @pytest.mark.usefixtures("saveData") def test_saveData(saveData): assert saveData
@pytest.mark.usefixtures("readSavedData")
def test_readSavedData(transformData, readSavedData): assert readSavedData.subtract(transformData).count() == 0

This is an illustration from PyCharm about directory structure unders tests


image.png


Let me know your thoughts.


Cheers,


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 




--
Reply | Threaded
Open this post in threaded view
|

Re: Testing ETL with Spark using Pytest

Mich Talebzadeh
Thanks Jerry for your comments.

The easiest option and I concur is to have all these fixture files currently under fixtures package lumped together in conftest.py under tests package.

Then you can get away all together from fixtures and it works. However, I gather plug and play becomes less manageable when you have a large number of fixtures (large being relative here). My main modules (not tests) are designed to do ETL from any database that supports JDBC connections (bar Google BigQuery that only works correctly with Spark API). You specify your source DB and target DB in yml file for any pluggable JDBC database

Going back to Pytest, please  check this reference below for the reason for fixtures packaging


With regard to your other point on fixtures (a fixture in each file), I have this fixture loadIntoMysqlTable() where it uses the data frame created in extractHiveData, reads sample records from Hive and populates MySql test table. The input needed is the Dataframe that is constructed in the fixture module extractHiveData which has been passed as parameter to this. This is the only way it seems to work through my tests


@pytest.fixture(scope = "session")
def extractHiveData():
    # read data through jdbc from Hive
    spark_session = s.spark_session(ctest['common']['appName'])
    tableName = config['GCPVariables']['sourceTable']
    fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName
   house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName)
    # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster
    num_rows = int(ctest['statics']['read_df_rows']/2)
    house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows))
    return house_df

@pytest.fixture(scope = "session")
def loadIntoMysqlTable(extractHiveData):
    try:
        extractHiveData. \
            write. \
            format("jdbc"). \
            option("url", test_url). \
            option("dbtable", ctest['statics']['sourceTable']). \
            option("user", ctest['statics']['user']). \
            option("password", ctest['statics']['password']). \
            option("driver", ctest['statics']['driver']). \
            mode(ctest['statics']['mode']). \
            save()
        return True
    except Exception as e:
        print(f"""{e}, quitting""")
        sys.exit(1)

Thanks again.


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 9 Feb 2021 at 15:47, Jerry Vinokurov <[hidden email]> wrote:
Hi Mich,

I'm a bit confused by what you mean when you say that you cannot call a fixture in another fixture. The fixtures resolve dependencies among themselves by means of their named parameters. So that means that if I have a fixture

@pytest.fixture
def fixture1():
    return SomeObj()

and another fixture

@pytest.fixture
def fixture2(fixture1)
    return do_something_with_obj(fixture1)

my second fixture will simply receive the object created by the first. As such, you do not need to "call" the second fixture at all. Of course, if you had some use case where you were constructing an object in the second fixture, you could have the first return a class, or you could have it return a function. In fact, I have fixtures in a project that do both. Here they are:

@pytest.fixture
def func():

def foo(x, y, z):

return (x + y) * z

return foo
That's a fixture that returns a function, and any test using the func fixture would receive that actual function as a value, which could then be invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more like what you're doing:


@pytest.fixture
def data_frame():

return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)], columns=['x', 'y', 'z'])
This one just returns a data frame that can be operated on.

Looking at your setup, I don't want to say that it's wrong per se (it could be very appropriate to your specific project to split things up among these many files) but I would say that it's not idiomatic usage of pytest fixtures, in my experience. It feels to me like you're jumping through a lot of hoops to set up something that could be done quite easily and compactly in conftest.py. I do want to emphasize that there is no limitation on how fixtures can be used within functions or within other fixtures (which are also just functions), since the result of the fixture call is just some Python object.

Hope this helps,
Jerry

On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh <[hidden email]> wrote:
I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.

In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.

The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!

Example  ## This is correct @pytest.fixture(scope = "session") def transformData(readSourceData):  ## fixture passed as parameter # this is incorrect (cannot call a fixture in another fixture) read_df = readSourceData()  So This operation becomes 

 transformation_df = readSourceData. \ select( \ ....

Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example

### file --> saveData.py @pytest.fixture(scope = "session") def saveData(transformData): # Write to test target table try: transformData. \ write. \ format("jdbc"). \ ....


You then drive this test by creating a file called conftest.py under tests package. You can then instantiate  your fixture files by referencing them in this file as below

import pytest from tests.fixtures.extractHiveData import extractHiveData from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable from tests.fixtures.readSavedData import readSavedData from tests.fixtures.readSourceData import readSourceData from tests.fixtures.transformData import transformData from tests.fixtures.saveData import saveData from tests.fixtures.readSavedData import readSavedData

Then you have your test Python file say test_oracle.py under package tests and then put assertions there

import pytest from src.config import ctest @pytest.mark.usefixtures("extractHiveData") def test_extract(extractHiveData): assert extractHiveData.count() > 0 @pytest.mark.usefixtures("loadIntoMysqlTable") def test_loadIntoMysqlTable(loadIntoMysqlTable): assert loadIntoMysqlTable @pytest.mark.usefixtures("readSavedData") def test_readSourceData(readSourceData): assert readSourceData.count() == ctest['statics']['read_df_rows'] @pytest.mark.usefixtures("transformData") def test_transformData(transformData): assert transformData.count() == ctest['statics']['transformation_df_rows'] @pytest.mark.usefixtures("saveData") def test_saveData(saveData): assert saveData
@pytest.mark.usefixtures("readSavedData")
def test_readSavedData(transformData, readSavedData): assert readSavedData.subtract(transformData).count() == 0

This is an illustration from PyCharm about directory structure unders tests


image.png


Let me know your thoughts.


Cheers,


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 




--
Reply | Threaded
Open this post in threaded view
|

Re: Testing ETL with Spark using Pytest

Jerry Vinokurov
Sure, I think it makes sense in many cases to break things up like this. Looking at your other example I'd say that you might want to break up extractHiveData into several fixtures (one for session, one for config, one for the df) because in my experience fixtures like those are reused constantly across a test suite. In general I try to keep my fixtures to one concrete task only, so that if I find myself repeating a pattern I just factor it out into another fixture.

On Tue, Feb 9, 2021 at 11:14 AM Mich Talebzadeh <[hidden email]> wrote:
Thanks Jerry for your comments.

The easiest option and I concur is to have all these fixture files currently under fixtures package lumped together in conftest.py under tests package.

Then you can get away all together from fixtures and it works. However, I gather plug and play becomes less manageable when you have a large number of fixtures (large being relative here). My main modules (not tests) are designed to do ETL from any database that supports JDBC connections (bar Google BigQuery that only works correctly with Spark API). You specify your source DB and target DB in yml file for any pluggable JDBC database

Going back to Pytest, please  check this reference below for the reason for fixtures packaging


With regard to your other point on fixtures (a fixture in each file), I have this fixture loadIntoMysqlTable() where it uses the data frame created in extractHiveData, reads sample records from Hive and populates MySql test table. The input needed is the Dataframe that is constructed in the fixture module extractHiveData which has been passed as parameter to this. This is the only way it seems to work through my tests


@pytest.fixture(scope = "session")
def extractHiveData():
    # read data through jdbc from Hive
    spark_session = s.spark_session(ctest['common']['appName'])
    tableName = config['GCPVariables']['sourceTable']
    fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName
   house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName)
    # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster
    num_rows = int(ctest['statics']['read_df_rows']/2)
    house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows))
    return house_df

@pytest.fixture(scope = "session")
def loadIntoMysqlTable(extractHiveData):
    try:
        extractHiveData. \
            write. \
            format("jdbc"). \
            option("url", test_url). \
            option("dbtable", ctest['statics']['sourceTable']). \
            option("user", ctest['statics']['user']). \
            option("password", ctest['statics']['password']). \
            option("driver", ctest['statics']['driver']). \
            mode(ctest['statics']['mode']). \
            save()
        return True
    except Exception as e:
        print(f"""{e}, quitting""")
        sys.exit(1)

Thanks again.


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 9 Feb 2021 at 15:47, Jerry Vinokurov <[hidden email]> wrote:
Hi Mich,

I'm a bit confused by what you mean when you say that you cannot call a fixture in another fixture. The fixtures resolve dependencies among themselves by means of their named parameters. So that means that if I have a fixture

@pytest.fixture
def fixture1():
    return SomeObj()

and another fixture

@pytest.fixture
def fixture2(fixture1)
    return do_something_with_obj(fixture1)

my second fixture will simply receive the object created by the first. As such, you do not need to "call" the second fixture at all. Of course, if you had some use case where you were constructing an object in the second fixture, you could have the first return a class, or you could have it return a function. In fact, I have fixtures in a project that do both. Here they are:

@pytest.fixture
def func():

def foo(x, y, z):

return (x + y) * z

return foo
That's a fixture that returns a function, and any test using the func fixture would receive that actual function as a value, which could then be invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more like what you're doing:


@pytest.fixture
def data_frame():

return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)], columns=['x', 'y', 'z'])
This one just returns a data frame that can be operated on.

Looking at your setup, I don't want to say that it's wrong per se (it could be very appropriate to your specific project to split things up among these many files) but I would say that it's not idiomatic usage of pytest fixtures, in my experience. It feels to me like you're jumping through a lot of hoops to set up something that could be done quite easily and compactly in conftest.py. I do want to emphasize that there is no limitation on how fixtures can be used within functions or within other fixtures (which are also just functions), since the result of the fixture call is just some Python object.

Hope this helps,
Jerry

On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh <[hidden email]> wrote:
I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.

In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.

The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!

Example  ## This is correct @pytest.fixture(scope = "session") def transformData(readSourceData):  ## fixture passed as parameter # this is incorrect (cannot call a fixture in another fixture) read_df = readSourceData()  So This operation becomes 

 transformation_df = readSourceData. \ select( \ ....

Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example

### file --> saveData.py @pytest.fixture(scope = "session") def saveData(transformData): # Write to test target table try: transformData. \ write. \ format("jdbc"). \ ....


You then drive this test by creating a file called conftest.py under tests package. You can then instantiate  your fixture files by referencing them in this file as below

import pytest from tests.fixtures.extractHiveData import extractHiveData from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable from tests.fixtures.readSavedData import readSavedData from tests.fixtures.readSourceData import readSourceData from tests.fixtures.transformData import transformData from tests.fixtures.saveData import saveData from tests.fixtures.readSavedData import readSavedData

Then you have your test Python file say test_oracle.py under package tests and then put assertions there

import pytest from src.config import ctest @pytest.mark.usefixtures("extractHiveData") def test_extract(extractHiveData): assert extractHiveData.count() > 0 @pytest.mark.usefixtures("loadIntoMysqlTable") def test_loadIntoMysqlTable(loadIntoMysqlTable): assert loadIntoMysqlTable @pytest.mark.usefixtures("readSavedData") def test_readSourceData(readSourceData): assert readSourceData.count() == ctest['statics']['read_df_rows'] @pytest.mark.usefixtures("transformData") def test_transformData(transformData): assert transformData.count() == ctest['statics']['transformation_df_rows'] @pytest.mark.usefixtures("saveData") def test_saveData(saveData): assert saveData
@pytest.mark.usefixtures("readSavedData")
def test_readSavedData(transformData, readSavedData): assert readSavedData.subtract(transformData).count() == 0

This is an illustration from PyCharm about directory structure unders tests


image.png


Let me know your thoughts.


Cheers,


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 




--


--
Reply | Threaded
Open this post in threaded view
|

Re: Testing ETL with Spark using Pytest

Mich Talebzadeh
Interesting points Jerry. I do not know how much atomising the unit test brings benefit.

For example we have

@pytest.fixture(scope = "session")
def extractHiveData():
    # read data through jdbc from Hive
    spark_session = s.spark_session(ctest['common']['appName'])
    tableName = config['GCPVariables']['sourceTable']
    fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName
   house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName)
    # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster
    num_rows = int(ctest['statics']['read_df_rows']/2)
    house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows))
    return house_df

Notes:

That spark_session is imported from a packaged and has been tested many times

The config static values are read through a python file config.py in turn reading a yml file config.yml

The important ones to test is house_df, the data frame to read from the Hive table. That can fail for a variety of reasons.

  1. The Hive driver used is old or out of date
  2. The Hive driver does not support kerberized access that may be the case in production
So any unit testing is going to be limited by scope. Also another point being is that if the extract data module fails then you are going to know that by calling it and probably can be rectified pretty quick. It is always the issue of coverage. How much testing needs to be covered.


HTH



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 9 Feb 2021 at 16:34, Jerry Vinokurov <[hidden email]> wrote:
Sure, I think it makes sense in many cases to break things up like this. Looking at your other example I'd say that you might want to break up extractHiveData into several fixtures (one for session, one for config, one for the df) because in my experience fixtures like those are reused constantly across a test suite. In general I try to keep my fixtures to one concrete task only, so that if I find myself repeating a pattern I just factor it out into another fixture.

On Tue, Feb 9, 2021 at 11:14 AM Mich Talebzadeh <[hidden email]> wrote:
Thanks Jerry for your comments.

The easiest option and I concur is to have all these fixture files currently under fixtures package lumped together in conftest.py under tests package.

Then you can get away all together from fixtures and it works. However, I gather plug and play becomes less manageable when you have a large number of fixtures (large being relative here). My main modules (not tests) are designed to do ETL from any database that supports JDBC connections (bar Google BigQuery that only works correctly with Spark API). You specify your source DB and target DB in yml file for any pluggable JDBC database

Going back to Pytest, please  check this reference below for the reason for fixtures packaging


With regard to your other point on fixtures (a fixture in each file), I have this fixture loadIntoMysqlTable() where it uses the data frame created in extractHiveData, reads sample records from Hive and populates MySql test table. The input needed is the Dataframe that is constructed in the fixture module extractHiveData which has been passed as parameter to this. This is the only way it seems to work through my tests


@pytest.fixture(scope = "session")
def extractHiveData():
    # read data through jdbc from Hive
    spark_session = s.spark_session(ctest['common']['appName'])
    tableName = config['GCPVariables']['sourceTable']
    fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName
   house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName)
    # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster
    num_rows = int(ctest['statics']['read_df_rows']/2)
    house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows))
    return house_df

@pytest.fixture(scope = "session")
def loadIntoMysqlTable(extractHiveData):
    try:
        extractHiveData. \
            write. \
            format("jdbc"). \
            option("url", test_url). \
            option("dbtable", ctest['statics']['sourceTable']). \
            option("user", ctest['statics']['user']). \
            option("password", ctest['statics']['password']). \
            option("driver", ctest['statics']['driver']). \
            mode(ctest['statics']['mode']). \
            save()
        return True
    except Exception as e:
        print(f"""{e}, quitting""")
        sys.exit(1)

Thanks again.


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 9 Feb 2021 at 15:47, Jerry Vinokurov <[hidden email]> wrote:
Hi Mich,

I'm a bit confused by what you mean when you say that you cannot call a fixture in another fixture. The fixtures resolve dependencies among themselves by means of their named parameters. So that means that if I have a fixture

@pytest.fixture
def fixture1():
    return SomeObj()

and another fixture

@pytest.fixture
def fixture2(fixture1)
    return do_something_with_obj(fixture1)

my second fixture will simply receive the object created by the first. As such, you do not need to "call" the second fixture at all. Of course, if you had some use case where you were constructing an object in the second fixture, you could have the first return a class, or you could have it return a function. In fact, I have fixtures in a project that do both. Here they are:

@pytest.fixture
def func():

def foo(x, y, z):

return (x + y) * z

return foo
That's a fixture that returns a function, and any test using the func fixture would receive that actual function as a value, which could then be invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more like what you're doing:


@pytest.fixture
def data_frame():

return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)], columns=['x', 'y', 'z'])
This one just returns a data frame that can be operated on.

Looking at your setup, I don't want to say that it's wrong per se (it could be very appropriate to your specific project to split things up among these many files) but I would say that it's not idiomatic usage of pytest fixtures, in my experience. It feels to me like you're jumping through a lot of hoops to set up something that could be done quite easily and compactly in conftest.py. I do want to emphasize that there is no limitation on how fixtures can be used within functions or within other fixtures (which are also just functions), since the result of the fixture call is just some Python object.

Hope this helps,
Jerry

On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh <[hidden email]> wrote:
I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.

In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.

The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!

Example  ## This is correct @pytest.fixture(scope = "session") def transformData(readSourceData):  ## fixture passed as parameter # this is incorrect (cannot call a fixture in another fixture) read_df = readSourceData()  So This operation becomes 

 transformation_df = readSourceData. \ select( \ ....

Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example

### file --> saveData.py @pytest.fixture(scope = "session") def saveData(transformData): # Write to test target table try: transformData. \ write. \ format("jdbc"). \ ....


You then drive this test by creating a file called conftest.py under tests package. You can then instantiate  your fixture files by referencing them in this file as below

import pytest from tests.fixtures.extractHiveData import extractHiveData from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable from tests.fixtures.readSavedData import readSavedData from tests.fixtures.readSourceData import readSourceData from tests.fixtures.transformData import transformData from tests.fixtures.saveData import saveData from tests.fixtures.readSavedData import readSavedData

Then you have your test Python file say test_oracle.py under package tests and then put assertions there

import pytest from src.config import ctest @pytest.mark.usefixtures("extractHiveData") def test_extract(extractHiveData): assert extractHiveData.count() > 0 @pytest.mark.usefixtures("loadIntoMysqlTable") def test_loadIntoMysqlTable(loadIntoMysqlTable): assert loadIntoMysqlTable @pytest.mark.usefixtures("readSavedData") def test_readSourceData(readSourceData): assert readSourceData.count() == ctest['statics']['read_df_rows'] @pytest.mark.usefixtures("transformData") def test_transformData(transformData): assert transformData.count() == ctest['statics']['transformation_df_rows'] @pytest.mark.usefixtures("saveData") def test_saveData(saveData): assert saveData
@pytest.mark.usefixtures("readSavedData")
def test_readSavedData(transformData, readSavedData): assert readSavedData.subtract(transformData).count() == 0

This is an illustration from PyCharm about directory structure unders tests


image.png


Let me know your thoughts.


Cheers,


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 




--


--
Reply | Threaded
Open this post in threaded view
|

Re: Testing ETL with Spark using Pytest

Marco Mistroni
Hey Mich
 my 2 cents on top of Jerry's.
for reusable @fixtures across your tests, i'd leverage conftest.py and put all of them there  -if number is not too big. OW. as you say, you can create  tests\fixtures where you place all of them there

in term of extractHiveDAta.... for a @fixture it is doing too much
A fixture in pytest - anyone correct if wrong - its just an object you can reuse across tests, something like this below. it should contain very  minimal code.. I'd say not more than 3 lines..

@fixture
def spark():
     return SparkSession()....

def test_mydataframe(spark):
   mydf = spark.table("mypreferredtable")

It seems to me your extractHiveDAta is doing too much.
IMHO it should be:

@pytest.fixture
def hive_extractor():
     return <s>

@pytext.fixture
def default_config():
     return <a default instance of your config>

def test_extraction_from_hive(spark, hive_extractor, default_config):
      tableName = config['GCPVariables']['sourceTable']
   fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName
   house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName)
   # To test your dataframe, do something like this
   test_df_pandas  =  <pandas>.from_csv("""regionName,col2,col3
                                    Kensington and chelsea,Value2,Value3""")
   test_df = spark.createDataFrame(test_df_pandas) 
   result_df = house_df.subtract(test_df)
   self.assertEquals(0, result_df.count())

as always, pls feel free to disagree.... havent done much on pytest/ fixtures but this is how i'd restructure...... 

hth
 Marco



On Tue, Feb 9, 2021 at 5:37 PM Mich Talebzadeh <[hidden email]> wrote:
Interesting points Jerry. I do not know how much atomising the unit test brings benefit.

For example we have

@pytest.fixture(scope = "session")
def extractHiveData():
    # read data through jdbc from Hive
    spark_session = s.spark_session(ctest['common']['appName'])
    tableName = config['GCPVariables']['sourceTable']
    fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName
   house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName)
    # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster
    num_rows = int(ctest['statics']['read_df_rows']/2)
    house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows))
    return house_df

Notes:

That spark_session is imported from a packaged and has been tested many times

The config static values are read through a python file config.py in turn reading a yml file config.yml

The important ones to test is house_df, the data frame to read from the Hive table. That can fail for a variety of reasons.

  1. The Hive driver used is old or out of date
  2. The Hive driver does not support kerberized access that may be the case in production
So any unit testing is going to be limited by scope. Also another point being is that if the extract data module fails then you are going to know that by calling it and probably can be rectified pretty quick. It is always the issue of coverage. How much testing needs to be covered.


HTH



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 9 Feb 2021 at 16:34, Jerry Vinokurov <[hidden email]> wrote:
Sure, I think it makes sense in many cases to break things up like this. Looking at your other example I'd say that you might want to break up extractHiveData into several fixtures (one for session, one for config, one for the df) because in my experience fixtures like those are reused constantly across a test suite. In general I try to keep my fixtures to one concrete task only, so that if I find myself repeating a pattern I just factor it out into another fixture.

On Tue, Feb 9, 2021 at 11:14 AM Mich Talebzadeh <[hidden email]> wrote:
Thanks Jerry for your comments.

The easiest option and I concur is to have all these fixture files currently under fixtures package lumped together in conftest.py under tests package.

Then you can get away all together from fixtures and it works. However, I gather plug and play becomes less manageable when you have a large number of fixtures (large being relative here). My main modules (not tests) are designed to do ETL from any database that supports JDBC connections (bar Google BigQuery that only works correctly with Spark API). You specify your source DB and target DB in yml file for any pluggable JDBC database

Going back to Pytest, please  check this reference below for the reason for fixtures packaging


With regard to your other point on fixtures (a fixture in each file), I have this fixture loadIntoMysqlTable() where it uses the data frame created in extractHiveData, reads sample records from Hive and populates MySql test table. The input needed is the Dataframe that is constructed in the fixture module extractHiveData which has been passed as parameter to this. This is the only way it seems to work through my tests


@pytest.fixture(scope = "session")
def extractHiveData():
    # read data through jdbc from Hive
    spark_session = s.spark_session(ctest['common']['appName'])
    tableName = config['GCPVariables']['sourceTable']
    fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName
   house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName)
    # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster
    num_rows = int(ctest['statics']['read_df_rows']/2)
    house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows))
    return house_df

@pytest.fixture(scope = "session")
def loadIntoMysqlTable(extractHiveData):
    try:
        extractHiveData. \
            write. \
            format("jdbc"). \
            option("url", test_url). \
            option("dbtable", ctest['statics']['sourceTable']). \
            option("user", ctest['statics']['user']). \
            option("password", ctest['statics']['password']). \
            option("driver", ctest['statics']['driver']). \
            mode(ctest['statics']['mode']). \
            save()
        return True
    except Exception as e:
        print(f"""{e}, quitting""")
        sys.exit(1)

Thanks again.


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 9 Feb 2021 at 15:47, Jerry Vinokurov <[hidden email]> wrote:
Hi Mich,

I'm a bit confused by what you mean when you say that you cannot call a fixture in another fixture. The fixtures resolve dependencies among themselves by means of their named parameters. So that means that if I have a fixture

@pytest.fixture
def fixture1():
    return SomeObj()

and another fixture

@pytest.fixture
def fixture2(fixture1)
    return do_something_with_obj(fixture1)

my second fixture will simply receive the object created by the first. As such, you do not need to "call" the second fixture at all. Of course, if you had some use case where you were constructing an object in the second fixture, you could have the first return a class, or you could have it return a function. In fact, I have fixtures in a project that do both. Here they are:

@pytest.fixture
def func():

def foo(x, y, z):

return (x + y) * z

return foo
That's a fixture that returns a function, and any test using the func fixture would receive that actual function as a value, which could then be invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more like what you're doing:


@pytest.fixture
def data_frame():

return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)], columns=['x', 'y', 'z'])
This one just returns a data frame that can be operated on.

Looking at your setup, I don't want to say that it's wrong per se (it could be very appropriate to your specific project to split things up among these many files) but I would say that it's not idiomatic usage of pytest fixtures, in my experience. It feels to me like you're jumping through a lot of hoops to set up something that could be done quite easily and compactly in conftest.py. I do want to emphasize that there is no limitation on how fixtures can be used within functions or within other fixtures (which are also just functions), since the result of the fixture call is just some Python object.

Hope this helps,
Jerry

On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh <[hidden email]> wrote:
I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.

In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.

The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!

Example  ## This is correct @pytest.fixture(scope = "session") def transformData(readSourceData):  ## fixture passed as parameter # this is incorrect (cannot call a fixture in another fixture) read_df = readSourceData()  So This operation becomes 

 transformation_df = readSourceData. \ select( \ ....

Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example

### file --> saveData.py @pytest.fixture(scope = "session") def saveData(transformData): # Write to test target table try: transformData. \ write. \ format("jdbc"). \ ....


You then drive this test by creating a file called conftest.py under tests package. You can then instantiate  your fixture files by referencing them in this file as below

import pytest from tests.fixtures.extractHiveData import extractHiveData from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable from tests.fixtures.readSavedData import readSavedData from tests.fixtures.readSourceData import readSourceData from tests.fixtures.transformData import transformData from tests.fixtures.saveData import saveData from tests.fixtures.readSavedData import readSavedData

Then you have your test Python file say test_oracle.py under package tests and then put assertions there

import pytest from src.config import ctest @pytest.mark.usefixtures("extractHiveData") def test_extract(extractHiveData): assert extractHiveData.count() > 0 @pytest.mark.usefixtures("loadIntoMysqlTable") def test_loadIntoMysqlTable(loadIntoMysqlTable): assert loadIntoMysqlTable @pytest.mark.usefixtures("readSavedData") def test_readSourceData(readSourceData): assert readSourceData.count() == ctest['statics']['read_df_rows'] @pytest.mark.usefixtures("transformData") def test_transformData(transformData): assert transformData.count() == ctest['statics']['transformation_df_rows'] @pytest.mark.usefixtures("saveData") def test_saveData(saveData): assert saveData
@pytest.mark.usefixtures("readSavedData")
def test_readSavedData(transformData, readSavedData): assert readSavedData.subtract(transformData).count() == 0

This is an illustration from PyCharm about directory structure unders tests


image.png


Let me know your thoughts.


Cheers,


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 




--


--
Reply | Threaded
Open this post in threaded view
|

Re: Testing ETL with Spark using Pytest

Mich Talebzadeh
Many thanks Marco.

Points noted and other points/criticism are equally welcome. In a forum like this we do  not disagree, we just agree to differ so to speak and share ideas.

I will review my code and take onboard your suggestions.

regards,

Mich




LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 9 Feb 2021 at 18:09, Sofia’s World <[hidden email]> wrote:
Hey Mich
 my 2 cents on top of Jerry's.
for reusable @fixtures across your tests, i'd leverage conftest.py and put all of them there  -if number is not too big. OW. as you say, you can create  tests\fixtures where you place all of them there

in term of extractHiveDAta.... for a @fixture it is doing too much
A fixture in pytest - anyone correct if wrong - its just an object you can reuse across tests, something like this below. it should contain very  minimal code.. I'd say not more than 3 lines..

@fixture
def spark():
     return SparkSession()....

def test_mydataframe(spark):
   mydf = spark.table("mypreferredtable")

It seems to me your extractHiveDAta is doing too much.
IMHO it should be:

@pytest.fixture
def hive_extractor():
     return <s>

@pytext.fixture
def default_config():
     return <a default instance of your config>

def test_extraction_from_hive(spark, hive_extractor, default_config):
      tableName = config['GCPVariables']['sourceTable']
   fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName
   house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName)
   # To test your dataframe, do something like this
   test_df_pandas  =  <pandas>.from_csv("""regionName,col2,col3
                                    Kensington and chelsea,Value2,Value3""")
   test_df = spark.createDataFrame(test_df_pandas) 
   result_df = house_df.subtract(test_df)
   self.assertEquals(0, result_df.count())

as always, pls feel free to disagree.... havent done much on pytest/ fixtures but this is how i'd restructure...... 

hth
 Marco



On Tue, Feb 9, 2021 at 5:37 PM Mich Talebzadeh <[hidden email]> wrote:
Interesting points Jerry. I do not know how much atomising the unit test brings benefit.

For example we have

@pytest.fixture(scope = "session")
def extractHiveData():
    # read data through jdbc from Hive
    spark_session = s.spark_session(ctest['common']['appName'])
    tableName = config['GCPVariables']['sourceTable']
    fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName
   house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName)
    # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster
    num_rows = int(ctest['statics']['read_df_rows']/2)
    house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows))
    return house_df

Notes:

That spark_session is imported from a packaged and has been tested many times

The config static values are read through a python file config.py in turn reading a yml file config.yml

The important ones to test is house_df, the data frame to read from the Hive table. That can fail for a variety of reasons.

  1. The Hive driver used is old or out of date
  2. The Hive driver does not support kerberized access that may be the case in production
So any unit testing is going to be limited by scope. Also another point being is that if the extract data module fails then you are going to know that by calling it and probably can be rectified pretty quick. It is always the issue of coverage. How much testing needs to be covered.


HTH



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 9 Feb 2021 at 16:34, Jerry Vinokurov <[hidden email]> wrote:
Sure, I think it makes sense in many cases to break things up like this. Looking at your other example I'd say that you might want to break up extractHiveData into several fixtures (one for session, one for config, one for the df) because in my experience fixtures like those are reused constantly across a test suite. In general I try to keep my fixtures to one concrete task only, so that if I find myself repeating a pattern I just factor it out into another fixture.

On Tue, Feb 9, 2021 at 11:14 AM Mich Talebzadeh <[hidden email]> wrote:
Thanks Jerry for your comments.

The easiest option and I concur is to have all these fixture files currently under fixtures package lumped together in conftest.py under tests package.

Then you can get away all together from fixtures and it works. However, I gather plug and play becomes less manageable when you have a large number of fixtures (large being relative here). My main modules (not tests) are designed to do ETL from any database that supports JDBC connections (bar Google BigQuery that only works correctly with Spark API). You specify your source DB and target DB in yml file for any pluggable JDBC database

Going back to Pytest, please  check this reference below for the reason for fixtures packaging


With regard to your other point on fixtures (a fixture in each file), I have this fixture loadIntoMysqlTable() where it uses the data frame created in extractHiveData, reads sample records from Hive and populates MySql test table. The input needed is the Dataframe that is constructed in the fixture module extractHiveData which has been passed as parameter to this. This is the only way it seems to work through my tests


@pytest.fixture(scope = "session")
def extractHiveData():
    # read data through jdbc from Hive
    spark_session = s.spark_session(ctest['common']['appName'])
    tableName = config['GCPVariables']['sourceTable']
    fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName
   house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName)
    # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster
    num_rows = int(ctest['statics']['read_df_rows']/2)
    house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows))
    return house_df

@pytest.fixture(scope = "session")
def loadIntoMysqlTable(extractHiveData):
    try:
        extractHiveData. \
            write. \
            format("jdbc"). \
            option("url", test_url). \
            option("dbtable", ctest['statics']['sourceTable']). \
            option("user", ctest['statics']['user']). \
            option("password", ctest['statics']['password']). \
            option("driver", ctest['statics']['driver']). \
            mode(ctest['statics']['mode']). \
            save()
        return True
    except Exception as e:
        print(f"""{e}, quitting""")
        sys.exit(1)

Thanks again.


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 9 Feb 2021 at 15:47, Jerry Vinokurov <[hidden email]> wrote:
Hi Mich,

I'm a bit confused by what you mean when you say that you cannot call a fixture in another fixture. The fixtures resolve dependencies among themselves by means of their named parameters. So that means that if I have a fixture

@pytest.fixture
def fixture1():
    return SomeObj()

and another fixture

@pytest.fixture
def fixture2(fixture1)
    return do_something_with_obj(fixture1)

my second fixture will simply receive the object created by the first. As such, you do not need to "call" the second fixture at all. Of course, if you had some use case where you were constructing an object in the second fixture, you could have the first return a class, or you could have it return a function. In fact, I have fixtures in a project that do both. Here they are:

@pytest.fixture
def func():

def foo(x, y, z):

return (x + y) * z

return foo
That's a fixture that returns a function, and any test using the func fixture would receive that actual function as a value, which could then be invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more like what you're doing:


@pytest.fixture
def data_frame():

return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)], columns=['x', 'y', 'z'])
This one just returns a data frame that can be operated on.

Looking at your setup, I don't want to say that it's wrong per se (it could be very appropriate to your specific project to split things up among these many files) but I would say that it's not idiomatic usage of pytest fixtures, in my experience. It feels to me like you're jumping through a lot of hoops to set up something that could be done quite easily and compactly in conftest.py. I do want to emphasize that there is no limitation on how fixtures can be used within functions or within other fixtures (which are also just functions), since the result of the fixture call is just some Python object.

Hope this helps,
Jerry

On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh <[hidden email]> wrote:
I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.

In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.

The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!

Example  ## This is correct @pytest.fixture(scope = "session") def transformData(readSourceData):  ## fixture passed as parameter # this is incorrect (cannot call a fixture in another fixture) read_df = readSourceData()  So This operation becomes 

 transformation_df = readSourceData. \ select( \ ....

Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example

### file --> saveData.py @pytest.fixture(scope = "session") def saveData(transformData): # Write to test target table try: transformData. \ write. \ format("jdbc"). \ ....


You then drive this test by creating a file called conftest.py under tests package. You can then instantiate  your fixture files by referencing them in this file as below

import pytest from tests.fixtures.extractHiveData import extractHiveData from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable from tests.fixtures.readSavedData import readSavedData from tests.fixtures.readSourceData import readSourceData from tests.fixtures.transformData import transformData from tests.fixtures.saveData import saveData from tests.fixtures.readSavedData import readSavedData

Then you have your test Python file say test_oracle.py under package tests and then put assertions there

import pytest from src.config import ctest @pytest.mark.usefixtures("extractHiveData") def test_extract(extractHiveData): assert extractHiveData.count() > 0 @pytest.mark.usefixtures("loadIntoMysqlTable") def test_loadIntoMysqlTable(loadIntoMysqlTable): assert loadIntoMysqlTable @pytest.mark.usefixtures("readSavedData") def test_readSourceData(readSourceData): assert readSourceData.count() == ctest['statics']['read_df_rows'] @pytest.mark.usefixtures("transformData") def test_transformData(transformData): assert transformData.count() == ctest['statics']['transformation_df_rows'] @pytest.mark.usefixtures("saveData") def test_saveData(saveData): assert saveData
@pytest.mark.usefixtures("readSavedData")
def test_readSavedData(transformData, readSavedData): assert readSavedData.subtract(transformData).count() == 0

This is an illustration from PyCharm about directory structure unders tests


image.png


Let me know your thoughts.


Cheers,


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 




--


--