|
|
I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.
In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.
The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!
Example
## This is correct
@pytest.fixture(scope = "session")
def transformData(readSourceData): ## fixture passed as parameter
# this is incorrect (cannot call a fixture in another fixture)
read_df = readSourceData()
So This operation becomes
transformation_df = readSourceData. \
select( \
....
Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example
### file --> saveData.py
@pytest.fixture(scope = "session")
def saveData(transformData):
# Write to test target table
try:
transformData. \
write. \
format("jdbc"). \
....
You then drive this test by creating a file called conftest.py under tests package. You can then instantiate your fixture files by referencing them in this file as below
import pytest
from tests.fixtures.extractHiveData import extractHiveData
from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable
from tests.fixtures.readSavedData import readSavedData
from tests.fixtures.readSourceData import readSourceData
from tests.fixtures.transformData import transformData
from tests.fixtures.saveData import saveData
from tests.fixtures.readSavedData import readSavedData
Then you have your test Python file say test_oracle.py under package tests and then put assertions there
import pytest
from src.config import ctest
@pytest.mark.usefixtures("extractHiveData")
def test_extract(extractHiveData):
assert extractHiveData.count() > 0
@pytest.mark.usefixtures("loadIntoMysqlTable")
def test_loadIntoMysqlTable(loadIntoMysqlTable):
assert loadIntoMysqlTable
@pytest.mark.usefixtures("readSavedData")
def test_readSourceData(readSourceData):
assert readSourceData.count() == ctest['statics']['read_df_rows']
@pytest.mark.usefixtures("transformData")
def test_transformData(transformData):
assert transformData.count() == ctest['statics']['transformation_df_rows']
@pytest.mark.usefixtures("saveData")
def test_saveData(saveData):
assert saveData
@pytest.mark.usefixtures("readSavedData") def test_readSavedData(transformData, readSavedData):
assert readSavedData.subtract(transformData).count() == 0
This is an illustration from PyCharm about directory structure unders tests

Let me know your thoughts.
Cheers,
Mich
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|
Hi Mich,
I'm a bit confused by what you mean when you say that you cannot call a fixture in another fixture. The fixtures resolve dependencies among themselves by means of their named parameters. So that means that if I have a fixture
@pytest.fixture def fixture1(): return SomeObj()
and another fixture
@pytest.fixture def fixture2(fixture1) return do_something_with_obj(fixture1)
my second fixture will simply receive the object created by the first. As such, you do not need to "call" the second fixture at all. Of course, if you had some use case where you were constructing an object in the second fixture, you could have the first return a class, or you could have it return a function. In fact, I have fixtures in a project that do both. Here they are:
@pytest.fixture def func():
def foo(x, y, z):
return (x + y) * z
return foo That's a fixture that returns a function, and any test using the func fixture would receive that actual function as a value, which could then be invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more like what you're doing:
@pytest.fixture def data_frame():
return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)], columns=['x', 'y', 'z']) This one just returns a data frame that can be operated on.
Looking at your setup, I don't want to say that it's wrong per se (it could be very appropriate to your specific project to split things up among these many files) but I would say that it's not idiomatic usage of pytest fixtures, in my experience. It feels to me like you're jumping through a lot of hoops to set up something that could be done quite easily and compactly in conftest.py. I do want to emphasize that there is no limitation on how fixtures can be used within functions or within other fixtures (which are also just functions), since the result of the fixture call is just some Python object.
Hope this helps, Jerry
On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh < [hidden email]> wrote: I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.
In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.
The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!
Example
## This is correct
@pytest.fixture(scope = "session")
def transformData(readSourceData): ## fixture passed as parameter
# this is incorrect (cannot call a fixture in another fixture)
read_df = readSourceData()
So This operation becomes
transformation_df = readSourceData. \
select( \
....
Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example
### file --> saveData.py
@pytest.fixture(scope = "session")
def saveData(transformData):
# Write to test target table
try:
transformData. \
write. \
format("jdbc"). \
....
You then drive this test by creating a file called conftest.py under tests package. You can then instantiate your fixture files by referencing them in this file as below
import pytest
from tests.fixtures.extractHiveData import extractHiveData
from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable
from tests.fixtures.readSavedData import readSavedData
from tests.fixtures.readSourceData import readSourceData
from tests.fixtures.transformData import transformData
from tests.fixtures.saveData import saveData
from tests.fixtures.readSavedData import readSavedData
Then you have your test Python file say test_oracle.py under package tests and then put assertions there
import pytest
from src.config import ctest
@pytest.mark.usefixtures("extractHiveData")
def test_extract(extractHiveData):
assert extractHiveData.count() > 0
@pytest.mark.usefixtures("loadIntoMysqlTable")
def test_loadIntoMysqlTable(loadIntoMysqlTable):
assert loadIntoMysqlTable
@pytest.mark.usefixtures("readSavedData")
def test_readSourceData(readSourceData):
assert readSourceData.count() == ctest['statics']['read_df_rows']
@pytest.mark.usefixtures("transformData")
def test_transformData(transformData):
assert transformData.count() == ctest['statics']['transformation_df_rows']
@pytest.mark.usefixtures("saveData")
def test_saveData(saveData):
assert saveData
@pytest.mark.usefixtures("readSavedData") def test_readSavedData(transformData, readSavedData):
assert readSavedData.subtract(transformData).count() == 0
This is an illustration from PyCharm about directory structure unders tests

Let me know your thoughts.
Cheers,
Mich
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
--
|
|
Thanks Jerry for your comments.
The easiest option and I concur is to have all these fixture files currently under fixtures package lumped together in conftest.py under tests package.
Then you can get away all together from fixtures and it works. However, I gather plug and play becomes less manageable when you have a large number of fixtures (large being relative here). My main modules (not tests) are designed to do ETL from any database that supports JDBC connections (bar Google BigQuery that only works correctly with Spark API). You specify your source DB and target DB in yml file for any pluggable JDBC database
Going back to Pytest, please check this reference below for the reason for fixtures packaging
With regard to your other point on fixtures (a fixture in each file), I have this fixture loadIntoMysqlTable() where it uses the data frame created in extractHiveData, reads sample records from Hive and populates MySql test table. The input needed is the Dataframe that is constructed in the fixture module extractHiveData which has been passed as parameter to this. This is the only way it seems to work through my tests
@pytest.fixture(scope = "session") def extractHiveData(): # read data through jdbc from Hive spark_session = s.spark_session(ctest['common']['appName']) tableName = config['GCPVariables']['sourceTable'] fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName) # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster num_rows = int(ctest['statics']['read_df_rows']/2) house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows)) return house_df
@pytest.fixture(scope = "session") def loadIntoMysqlTable(extractHiveData): try: extractHiveData. \ write. \ format("jdbc"). \ option("url", test_url). \ option("dbtable", ctest['statics']['sourceTable']). \ option("user", ctest['statics']['user']). \ option("password", ctest['statics']['password']). \ option("driver", ctest['statics']['driver']). \ mode(ctest['statics']['mode']). \ save() return True except Exception as e: print(f"""{e}, quitting""") sys.exit(1)
Thanks again.
Mich
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi Mich,
I'm a bit confused by what you mean when you say that you cannot call a fixture in another fixture. The fixtures resolve dependencies among themselves by means of their named parameters. So that means that if I have a fixture
@pytest.fixture def fixture1(): return SomeObj()
and another fixture
@pytest.fixture def fixture2(fixture1) return do_something_with_obj(fixture1)
my second fixture will simply receive the object created by the first. As such, you do not need to "call" the second fixture at all. Of course, if you had some use case where you were constructing an object in the second fixture, you could have the first return a class, or you could have it return a function. In fact, I have fixtures in a project that do both. Here they are:
@pytest.fixture def func():
def foo(x, y, z):
return (x + y) * z
return foo That's a fixture that returns a function, and any test using the func fixture would receive that actual function as a value, which could then be invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more like what you're doing:
@pytest.fixture def data_frame():
return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)], columns=['x', 'y', 'z']) This one just returns a data frame that can be operated on.
Looking at your setup, I don't want to say that it's wrong per se (it could be very appropriate to your specific project to split things up among these many files) but I would say that it's not idiomatic usage of pytest fixtures, in my experience. It feels to me like you're jumping through a lot of hoops to set up something that could be done quite easily and compactly in conftest.py. I do want to emphasize that there is no limitation on how fixtures can be used within functions or within other fixtures (which are also just functions), since the result of the fixture call is just some Python object.
Hope this helps, Jerry
On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh < [hidden email]> wrote: I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.
In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.
The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!
Example
## This is correct
@pytest.fixture(scope = "session")
def transformData(readSourceData): ## fixture passed as parameter
# this is incorrect (cannot call a fixture in another fixture)
read_df = readSourceData()
So This operation becomes
transformation_df = readSourceData. \
select( \
....
Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example
### file --> saveData.py
@pytest.fixture(scope = "session")
def saveData(transformData):
# Write to test target table
try:
transformData. \
write. \
format("jdbc"). \
....
You then drive this test by creating a file called conftest.py under tests package. You can then instantiate your fixture files by referencing them in this file as below
import pytest
from tests.fixtures.extractHiveData import extractHiveData
from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable
from tests.fixtures.readSavedData import readSavedData
from tests.fixtures.readSourceData import readSourceData
from tests.fixtures.transformData import transformData
from tests.fixtures.saveData import saveData
from tests.fixtures.readSavedData import readSavedData
Then you have your test Python file say test_oracle.py under package tests and then put assertions there
import pytest
from src.config import ctest
@pytest.mark.usefixtures("extractHiveData")
def test_extract(extractHiveData):
assert extractHiveData.count() > 0
@pytest.mark.usefixtures("loadIntoMysqlTable")
def test_loadIntoMysqlTable(loadIntoMysqlTable):
assert loadIntoMysqlTable
@pytest.mark.usefixtures("readSavedData")
def test_readSourceData(readSourceData):
assert readSourceData.count() == ctest['statics']['read_df_rows']
@pytest.mark.usefixtures("transformData")
def test_transformData(transformData):
assert transformData.count() == ctest['statics']['transformation_df_rows']
@pytest.mark.usefixtures("saveData")
def test_saveData(saveData):
assert saveData
@pytest.mark.usefixtures("readSavedData") def test_readSavedData(transformData, readSavedData):
assert readSavedData.subtract(transformData).count() == 0
This is an illustration from PyCharm about directory structure unders tests

Let me know your thoughts.
Cheers,
Mich
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
--
|
|
Sure, I think it makes sense in many cases to break things up like this. Looking at your other example I'd say that you might want to break up extractHiveData into several fixtures (one for session, one for config, one for the df) because in my experience fixtures like those are reused constantly across a test suite. In general I try to keep my fixtures to one concrete task only, so that if I find myself repeating a pattern I just factor it out into another fixture. On Tue, Feb 9, 2021 at 11:14 AM Mich Talebzadeh < [hidden email]> wrote: Thanks Jerry for your comments.
The easiest option and I concur is to have all these fixture files currently under fixtures package lumped together in conftest.py under tests package.
Then you can get away all together from fixtures and it works. However, I gather plug and play becomes less manageable when you have a large number of fixtures (large being relative here). My main modules (not tests) are designed to do ETL from any database that supports JDBC connections (bar Google BigQuery that only works correctly with Spark API). You specify your source DB and target DB in yml file for any pluggable JDBC database
Going back to Pytest, please check this reference below for the reason for fixtures packaging
With regard to your other point on fixtures (a fixture in each file), I have this fixture loadIntoMysqlTable() where it uses the data frame created in extractHiveData, reads sample records from Hive and populates MySql test table. The input needed is the Dataframe that is constructed in the fixture module extractHiveData which has been passed as parameter to this. This is the only way it seems to work through my tests
@pytest.fixture(scope = "session") def extractHiveData(): # read data through jdbc from Hive spark_session = s.spark_session(ctest['common']['appName']) tableName = config['GCPVariables']['sourceTable'] fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName) # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster num_rows = int(ctest['statics']['read_df_rows']/2) house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows)) return house_df
@pytest.fixture(scope = "session") def loadIntoMysqlTable(extractHiveData): try: extractHiveData. \ write. \ format("jdbc"). \ option("url", test_url). \ option("dbtable", ctest['statics']['sourceTable']). \ option("user", ctest['statics']['user']). \ option("password", ctest['statics']['password']). \ option("driver", ctest['statics']['driver']). \ mode(ctest['statics']['mode']). \ save() return True except Exception as e: print(f"""{e}, quitting""") sys.exit(1)
Thanks again.
Mich
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi Mich,
I'm a bit confused by what you mean when you say that you cannot call a fixture in another fixture. The fixtures resolve dependencies among themselves by means of their named parameters. So that means that if I have a fixture
@pytest.fixture def fixture1(): return SomeObj()
and another fixture
@pytest.fixture def fixture2(fixture1) return do_something_with_obj(fixture1)
my second fixture will simply receive the object created by the first. As such, you do not need to "call" the second fixture at all. Of course, if you had some use case where you were constructing an object in the second fixture, you could have the first return a class, or you could have it return a function. In fact, I have fixtures in a project that do both. Here they are:
@pytest.fixture def func():
def foo(x, y, z):
return (x + y) * z
return foo That's a fixture that returns a function, and any test using the func fixture would receive that actual function as a value, which could then be invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more like what you're doing:
@pytest.fixture def data_frame():
return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)], columns=['x', 'y', 'z']) This one just returns a data frame that can be operated on.
Looking at your setup, I don't want to say that it's wrong per se (it could be very appropriate to your specific project to split things up among these many files) but I would say that it's not idiomatic usage of pytest fixtures, in my experience. It feels to me like you're jumping through a lot of hoops to set up something that could be done quite easily and compactly in conftest.py. I do want to emphasize that there is no limitation on how fixtures can be used within functions or within other fixtures (which are also just functions), since the result of the fixture call is just some Python object.
Hope this helps, Jerry
On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh < [hidden email]> wrote: I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.
In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.
The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!
Example
## This is correct
@pytest.fixture(scope = "session")
def transformData(readSourceData): ## fixture passed as parameter
# this is incorrect (cannot call a fixture in another fixture)
read_df = readSourceData()
So This operation becomes
transformation_df = readSourceData. \
select( \
....
Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example
### file --> saveData.py
@pytest.fixture(scope = "session")
def saveData(transformData):
# Write to test target table
try:
transformData. \
write. \
format("jdbc"). \
....
You then drive this test by creating a file called conftest.py under tests package. You can then instantiate your fixture files by referencing them in this file as below
import pytest
from tests.fixtures.extractHiveData import extractHiveData
from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable
from tests.fixtures.readSavedData import readSavedData
from tests.fixtures.readSourceData import readSourceData
from tests.fixtures.transformData import transformData
from tests.fixtures.saveData import saveData
from tests.fixtures.readSavedData import readSavedData
Then you have your test Python file say test_oracle.py under package tests and then put assertions there
import pytest
from src.config import ctest
@pytest.mark.usefixtures("extractHiveData")
def test_extract(extractHiveData):
assert extractHiveData.count() > 0
@pytest.mark.usefixtures("loadIntoMysqlTable")
def test_loadIntoMysqlTable(loadIntoMysqlTable):
assert loadIntoMysqlTable
@pytest.mark.usefixtures("readSavedData")
def test_readSourceData(readSourceData):
assert readSourceData.count() == ctest['statics']['read_df_rows']
@pytest.mark.usefixtures("transformData")
def test_transformData(transformData):
assert transformData.count() == ctest['statics']['transformation_df_rows']
@pytest.mark.usefixtures("saveData")
def test_saveData(saveData):
assert saveData
@pytest.mark.usefixtures("readSavedData") def test_readSavedData(transformData, readSavedData):
assert readSavedData.subtract(transformData).count() == 0
This is an illustration from PyCharm about directory structure unders tests

Let me know your thoughts.
Cheers,
Mich
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
--
--
|
|
Interesting points Jerry. I do not know how much atomising the unit test brings benefit.
For example we have
@pytest.fixture(scope = "session") def extractHiveData(): # read data through jdbc from Hive spark_session = s.spark_session(ctest['common']['appName']) tableName = config['GCPVariables']['sourceTable'] fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName) # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster num_rows = int(ctest['statics']['read_df_rows']/2) house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows)) return house_df
Notes:
That spark_session is imported from a packaged and has been tested many times
The config static values are read through a python file config.py in turn reading a yml file config.yml
The important ones to test is house_df, the data frame to read from the Hive table. That can fail for a variety of reasons.
- The Hive driver used is old or out of date
- The Hive driver does not support kerberized access that may be the case in production
So any unit testing is going to be limited by scope. Also another point being is that if the extract data module fails then you are going to know that by calling it and probably can be rectified pretty quick. It is always the issue of coverage. How much testing needs to be covered.
HTH

LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Sure, I think it makes sense in many cases to break things up like this. Looking at your other example I'd say that you might want to break up extractHiveData into several fixtures (one for session, one for config, one for the df) because in my experience fixtures like those are reused constantly across a test suite. In general I try to keep my fixtures to one concrete task only, so that if I find myself repeating a pattern I just factor it out into another fixture.
On Tue, Feb 9, 2021 at 11:14 AM Mich Talebzadeh < [hidden email]> wrote: Thanks Jerry for your comments.
The easiest option and I concur is to have all these fixture files currently under fixtures package lumped together in conftest.py under tests package.
Then you can get away all together from fixtures and it works. However, I gather plug and play becomes less manageable when you have a large number of fixtures (large being relative here). My main modules (not tests) are designed to do ETL from any database that supports JDBC connections (bar Google BigQuery that only works correctly with Spark API). You specify your source DB and target DB in yml file for any pluggable JDBC database
Going back to Pytest, please check this reference below for the reason for fixtures packaging
With regard to your other point on fixtures (a fixture in each file), I have this fixture loadIntoMysqlTable() where it uses the data frame created in extractHiveData, reads sample records from Hive and populates MySql test table. The input needed is the Dataframe that is constructed in the fixture module extractHiveData which has been passed as parameter to this. This is the only way it seems to work through my tests
@pytest.fixture(scope = "session") def extractHiveData(): # read data through jdbc from Hive spark_session = s.spark_session(ctest['common']['appName']) tableName = config['GCPVariables']['sourceTable'] fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName) # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster num_rows = int(ctest['statics']['read_df_rows']/2) house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows)) return house_df
@pytest.fixture(scope = "session") def loadIntoMysqlTable(extractHiveData): try: extractHiveData. \ write. \ format("jdbc"). \ option("url", test_url). \ option("dbtable", ctest['statics']['sourceTable']). \ option("user", ctest['statics']['user']). \ option("password", ctest['statics']['password']). \ option("driver", ctest['statics']['driver']). \ mode(ctest['statics']['mode']). \ save() return True except Exception as e: print(f"""{e}, quitting""") sys.exit(1)
Thanks again.
Mich
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi Mich,
I'm a bit confused by what you mean when you say that you cannot call a fixture in another fixture. The fixtures resolve dependencies among themselves by means of their named parameters. So that means that if I have a fixture
@pytest.fixture def fixture1(): return SomeObj()
and another fixture
@pytest.fixture def fixture2(fixture1) return do_something_with_obj(fixture1)
my second fixture will simply receive the object created by the first. As such, you do not need to "call" the second fixture at all. Of course, if you had some use case where you were constructing an object in the second fixture, you could have the first return a class, or you could have it return a function. In fact, I have fixtures in a project that do both. Here they are:
@pytest.fixture def func():
def foo(x, y, z):
return (x + y) * z
return foo That's a fixture that returns a function, and any test using the func fixture would receive that actual function as a value, which could then be invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more like what you're doing:
@pytest.fixture def data_frame():
return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)], columns=['x', 'y', 'z']) This one just returns a data frame that can be operated on.
Looking at your setup, I don't want to say that it's wrong per se (it could be very appropriate to your specific project to split things up among these many files) but I would say that it's not idiomatic usage of pytest fixtures, in my experience. It feels to me like you're jumping through a lot of hoops to set up something that could be done quite easily and compactly in conftest.py. I do want to emphasize that there is no limitation on how fixtures can be used within functions or within other fixtures (which are also just functions), since the result of the fixture call is just some Python object.
Hope this helps, Jerry
On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh < [hidden email]> wrote: I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.
In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.
The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!
Example
## This is correct
@pytest.fixture(scope = "session")
def transformData(readSourceData): ## fixture passed as parameter
# this is incorrect (cannot call a fixture in another fixture)
read_df = readSourceData()
So This operation becomes
transformation_df = readSourceData. \
select( \
....
Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example
### file --> saveData.py
@pytest.fixture(scope = "session")
def saveData(transformData):
# Write to test target table
try:
transformData. \
write. \
format("jdbc"). \
....
You then drive this test by creating a file called conftest.py under tests package. You can then instantiate your fixture files by referencing them in this file as below
import pytest
from tests.fixtures.extractHiveData import extractHiveData
from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable
from tests.fixtures.readSavedData import readSavedData
from tests.fixtures.readSourceData import readSourceData
from tests.fixtures.transformData import transformData
from tests.fixtures.saveData import saveData
from tests.fixtures.readSavedData import readSavedData
Then you have your test Python file say test_oracle.py under package tests and then put assertions there
import pytest
from src.config import ctest
@pytest.mark.usefixtures("extractHiveData")
def test_extract(extractHiveData):
assert extractHiveData.count() > 0
@pytest.mark.usefixtures("loadIntoMysqlTable")
def test_loadIntoMysqlTable(loadIntoMysqlTable):
assert loadIntoMysqlTable
@pytest.mark.usefixtures("readSavedData")
def test_readSourceData(readSourceData):
assert readSourceData.count() == ctest['statics']['read_df_rows']
@pytest.mark.usefixtures("transformData")
def test_transformData(transformData):
assert transformData.count() == ctest['statics']['transformation_df_rows']
@pytest.mark.usefixtures("saveData")
def test_saveData(saveData):
assert saveData
@pytest.mark.usefixtures("readSavedData") def test_readSavedData(transformData, readSavedData):
assert readSavedData.subtract(transformData).count() == 0
This is an illustration from PyCharm about directory structure unders tests

Let me know your thoughts.
Cheers,
Mich
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
--
--
|
|
Hey Mich my 2 cents on top of Jerry's. for reusable @fixtures across your tests, i'd leverage conftest.py and put all of them there -if number is not too big. OW. as you say, you can create tests\fixtures where you place all of them there
in term of extractHiveDAta.... for a @fixture it is doing too much A fixture in pytest - anyone correct if wrong - its just an object you can reuse across tests, something like this below. it should contain very minimal code.. I'd say not more than 3 lines..
@fixture def spark(): return SparkSession()....
def test_mydataframe(spark): mydf = spark.table("mypreferredtable")
It seems to me your extractHiveDAta is doing too much.
IMHO it should be:
@pytest.fixture def hive_extractor(): return <s>
@pytext.fixture def default_config(): return <a default instance of your config>
def test_extraction_from_hive(spark, hive_extractor, default_config):
tableName = config['GCPVariables']['sourceTable'] fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName) # To test your dataframe, do something like this
test_df_pandas = <pandas>.from_csv("""regionName,col2,col3 Kensington and chelsea,Value2,Value3""") test_df = spark.createDataFrame(test_df_pandas) result_df = house_df.subtract(test_df)
self.assertEquals(0, result_df.count())
as always, pls feel free to disagree.... havent done much on pytest/ fixtures but this is how i'd restructure......
hth Marco
On Tue, Feb 9, 2021 at 5:37 PM Mich Talebzadeh < [hidden email]> wrote: Interesting points Jerry. I do not know how much atomising the unit test brings benefit.
For example we have
@pytest.fixture(scope = "session") def extractHiveData(): # read data through jdbc from Hive spark_session = s.spark_session(ctest['common']['appName']) tableName = config['GCPVariables']['sourceTable'] fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName) # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster num_rows = int(ctest['statics']['read_df_rows']/2) house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows)) return house_df
Notes:
That spark_session is imported from a packaged and has been tested many times
The config static values are read through a python file config.py in turn reading a yml file config.yml
The important ones to test is house_df, the data frame to read from the Hive table. That can fail for a variety of reasons.
- The Hive driver used is old or out of date
- The Hive driver does not support kerberized access that may be the case in production
So any unit testing is going to be limited by scope. Also another point being is that if the extract data module fails then you are going to know that by calling it and probably can be rectified pretty quick. It is always the issue of coverage. How much testing needs to be covered.
HTH

LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Sure, I think it makes sense in many cases to break things up like this. Looking at your other example I'd say that you might want to break up extractHiveData into several fixtures (one for session, one for config, one for the df) because in my experience fixtures like those are reused constantly across a test suite. In general I try to keep my fixtures to one concrete task only, so that if I find myself repeating a pattern I just factor it out into another fixture.
On Tue, Feb 9, 2021 at 11:14 AM Mich Talebzadeh < [hidden email]> wrote: Thanks Jerry for your comments.
The easiest option and I concur is to have all these fixture files currently under fixtures package lumped together in conftest.py under tests package.
Then you can get away all together from fixtures and it works. However, I gather plug and play becomes less manageable when you have a large number of fixtures (large being relative here). My main modules (not tests) are designed to do ETL from any database that supports JDBC connections (bar Google BigQuery that only works correctly with Spark API). You specify your source DB and target DB in yml file for any pluggable JDBC database
Going back to Pytest, please check this reference below for the reason for fixtures packaging
With regard to your other point on fixtures (a fixture in each file), I have this fixture loadIntoMysqlTable() where it uses the data frame created in extractHiveData, reads sample records from Hive and populates MySql test table. The input needed is the Dataframe that is constructed in the fixture module extractHiveData which has been passed as parameter to this. This is the only way it seems to work through my tests
@pytest.fixture(scope = "session") def extractHiveData(): # read data through jdbc from Hive spark_session = s.spark_session(ctest['common']['appName']) tableName = config['GCPVariables']['sourceTable'] fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName) # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster num_rows = int(ctest['statics']['read_df_rows']/2) house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows)) return house_df
@pytest.fixture(scope = "session") def loadIntoMysqlTable(extractHiveData): try: extractHiveData. \ write. \ format("jdbc"). \ option("url", test_url). \ option("dbtable", ctest['statics']['sourceTable']). \ option("user", ctest['statics']['user']). \ option("password", ctest['statics']['password']). \ option("driver", ctest['statics']['driver']). \ mode(ctest['statics']['mode']). \ save() return True except Exception as e: print(f"""{e}, quitting""") sys.exit(1)
Thanks again.
Mich
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi Mich,
I'm a bit confused by what you mean when you say that you cannot call a fixture in another fixture. The fixtures resolve dependencies among themselves by means of their named parameters. So that means that if I have a fixture
@pytest.fixture def fixture1(): return SomeObj()
and another fixture
@pytest.fixture def fixture2(fixture1) return do_something_with_obj(fixture1)
my second fixture will simply receive the object created by the first. As such, you do not need to "call" the second fixture at all. Of course, if you had some use case where you were constructing an object in the second fixture, you could have the first return a class, or you could have it return a function. In fact, I have fixtures in a project that do both. Here they are:
@pytest.fixture def func():
def foo(x, y, z):
return (x + y) * z
return foo That's a fixture that returns a function, and any test using the func fixture would receive that actual function as a value, which could then be invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more like what you're doing:
@pytest.fixture def data_frame():
return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)], columns=['x', 'y', 'z']) This one just returns a data frame that can be operated on.
Looking at your setup, I don't want to say that it's wrong per se (it could be very appropriate to your specific project to split things up among these many files) but I would say that it's not idiomatic usage of pytest fixtures, in my experience. It feels to me like you're jumping through a lot of hoops to set up something that could be done quite easily and compactly in conftest.py. I do want to emphasize that there is no limitation on how fixtures can be used within functions or within other fixtures (which are also just functions), since the result of the fixture call is just some Python object.
Hope this helps, Jerry
On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh < [hidden email]> wrote: I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.
In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.
The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!
Example
## This is correct
@pytest.fixture(scope = "session")
def transformData(readSourceData): ## fixture passed as parameter
# this is incorrect (cannot call a fixture in another fixture)
read_df = readSourceData()
So This operation becomes
transformation_df = readSourceData. \
select( \
....
Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example
### file --> saveData.py
@pytest.fixture(scope = "session")
def saveData(transformData):
# Write to test target table
try:
transformData. \
write. \
format("jdbc"). \
....
You then drive this test by creating a file called conftest.py under tests package. You can then instantiate your fixture files by referencing them in this file as below
import pytest
from tests.fixtures.extractHiveData import extractHiveData
from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable
from tests.fixtures.readSavedData import readSavedData
from tests.fixtures.readSourceData import readSourceData
from tests.fixtures.transformData import transformData
from tests.fixtures.saveData import saveData
from tests.fixtures.readSavedData import readSavedData
Then you have your test Python file say test_oracle.py under package tests and then put assertions there
import pytest
from src.config import ctest
@pytest.mark.usefixtures("extractHiveData")
def test_extract(extractHiveData):
assert extractHiveData.count() > 0
@pytest.mark.usefixtures("loadIntoMysqlTable")
def test_loadIntoMysqlTable(loadIntoMysqlTable):
assert loadIntoMysqlTable
@pytest.mark.usefixtures("readSavedData")
def test_readSourceData(readSourceData):
assert readSourceData.count() == ctest['statics']['read_df_rows']
@pytest.mark.usefixtures("transformData")
def test_transformData(transformData):
assert transformData.count() == ctest['statics']['transformation_df_rows']
@pytest.mark.usefixtures("saveData")
def test_saveData(saveData):
assert saveData
@pytest.mark.usefixtures("readSavedData") def test_readSavedData(transformData, readSavedData):
assert readSavedData.subtract(transformData).count() == 0
This is an illustration from PyCharm about directory structure unders tests

Let me know your thoughts.
Cheers,
Mich
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
--
--
|
|
Many thanks Marco.
Points noted and other points/criticism are equally welcome. In a forum like this we do not disagree, we just agree to differ so to speak and share ideas.
I will review my code and take onboard your suggestions.
regards,
Mich

LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hey Mich my 2 cents on top of Jerry's. for reusable @fixtures across your tests, i'd leverage conftest.py and put all of them there -if number is not too big. OW. as you say, you can create tests\fixtures where you place all of them there
in term of extractHiveDAta.... for a @fixture it is doing too much A fixture in pytest - anyone correct if wrong - its just an object you can reuse across tests, something like this below. it should contain very minimal code.. I'd say not more than 3 lines..
@fixture def spark(): return SparkSession()....
def test_mydataframe(spark): mydf = spark.table("mypreferredtable")
It seems to me your extractHiveDAta is doing too much.
IMHO it should be:
@pytest.fixture def hive_extractor(): return <s>
@pytext.fixture def default_config(): return <a default instance of your config>
def test_extraction_from_hive(spark, hive_extractor, default_config):
tableName = config['GCPVariables']['sourceTable'] fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName) # To test your dataframe, do something like this
test_df_pandas = <pandas>.from_csv("""regionName,col2,col3 Kensington and chelsea,Value2,Value3""") test_df = spark.createDataFrame(test_df_pandas) result_df = house_df.subtract(test_df)
self.assertEquals(0, result_df.count())
as always, pls feel free to disagree.... havent done much on pytest/ fixtures but this is how i'd restructure......
hth Marco
On Tue, Feb 9, 2021 at 5:37 PM Mich Talebzadeh < [hidden email]> wrote: Interesting points Jerry. I do not know how much atomising the unit test brings benefit.
For example we have
@pytest.fixture(scope = "session") def extractHiveData(): # read data through jdbc from Hive spark_session = s.spark_session(ctest['common']['appName']) tableName = config['GCPVariables']['sourceTable'] fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName) # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster num_rows = int(ctest['statics']['read_df_rows']/2) house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows)) return house_df
Notes:
That spark_session is imported from a packaged and has been tested many times
The config static values are read through a python file config.py in turn reading a yml file config.yml
The important ones to test is house_df, the data frame to read from the Hive table. That can fail for a variety of reasons.
- The Hive driver used is old or out of date
- The Hive driver does not support kerberized access that may be the case in production
So any unit testing is going to be limited by scope. Also another point being is that if the extract data module fails then you are going to know that by calling it and probably can be rectified pretty quick. It is always the issue of coverage. How much testing needs to be covered.
HTH

LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Sure, I think it makes sense in many cases to break things up like this. Looking at your other example I'd say that you might want to break up extractHiveData into several fixtures (one for session, one for config, one for the df) because in my experience fixtures like those are reused constantly across a test suite. In general I try to keep my fixtures to one concrete task only, so that if I find myself repeating a pattern I just factor it out into another fixture.
On Tue, Feb 9, 2021 at 11:14 AM Mich Talebzadeh < [hidden email]> wrote: Thanks Jerry for your comments.
The easiest option and I concur is to have all these fixture files currently under fixtures package lumped together in conftest.py under tests package.
Then you can get away all together from fixtures and it works. However, I gather plug and play becomes less manageable when you have a large number of fixtures (large being relative here). My main modules (not tests) are designed to do ETL from any database that supports JDBC connections (bar Google BigQuery that only works correctly with Spark API). You specify your source DB and target DB in yml file for any pluggable JDBC database
Going back to Pytest, please check this reference below for the reason for fixtures packaging
With regard to your other point on fixtures (a fixture in each file), I have this fixture loadIntoMysqlTable() where it uses the data frame created in extractHiveData, reads sample records from Hive and populates MySql test table. The input needed is the Dataframe that is constructed in the fixture module extractHiveData which has been passed as parameter to this. This is the only way it seems to work through my tests
@pytest.fixture(scope = "session") def extractHiveData(): # read data through jdbc from Hive spark_session = s.spark_session(ctest['common']['appName']) tableName = config['GCPVariables']['sourceTable'] fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' + tableName house_df = s.loadTableFromHiveJDBC(spark_session, fullyQualifiedTableName) # sample data selected equally n rows from Kensington and Chelsea and n rows from City of Westminster num_rows = int(ctest['statics']['read_df_rows']/2) house_df = house_df.filter(col("regionname") == "Kensington and Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") == "City of Westminster").limit(num_rows)) return house_df
@pytest.fixture(scope = "session") def loadIntoMysqlTable(extractHiveData): try: extractHiveData. \ write. \ format("jdbc"). \ option("url", test_url). \ option("dbtable", ctest['statics']['sourceTable']). \ option("user", ctest['statics']['user']). \ option("password", ctest['statics']['password']). \ option("driver", ctest['statics']['driver']). \ mode(ctest['statics']['mode']). \ save() return True except Exception as e: print(f"""{e}, quitting""") sys.exit(1)
Thanks again.
Mich
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi Mich,
I'm a bit confused by what you mean when you say that you cannot call a fixture in another fixture. The fixtures resolve dependencies among themselves by means of their named parameters. So that means that if I have a fixture
@pytest.fixture def fixture1(): return SomeObj()
and another fixture
@pytest.fixture def fixture2(fixture1) return do_something_with_obj(fixture1)
my second fixture will simply receive the object created by the first. As such, you do not need to "call" the second fixture at all. Of course, if you had some use case where you were constructing an object in the second fixture, you could have the first return a class, or you could have it return a function. In fact, I have fixtures in a project that do both. Here they are:
@pytest.fixture def func():
def foo(x, y, z):
return (x + y) * z
return foo That's a fixture that returns a function, and any test using the func fixture would receive that actual function as a value, which could then be invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more like what you're doing:
@pytest.fixture def data_frame():
return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)], columns=['x', 'y', 'z']) This one just returns a data frame that can be operated on.
Looking at your setup, I don't want to say that it's wrong per se (it could be very appropriate to your specific project to split things up among these many files) but I would say that it's not idiomatic usage of pytest fixtures, in my experience. It feels to me like you're jumping through a lot of hoops to set up something that could be done quite easily and compactly in conftest.py. I do want to emphasize that there is no limitation on how fixtures can be used within functions or within other fixtures (which are also just functions), since the result of the fixture call is just some Python object.
Hope this helps, Jerry
On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh < [hidden email]> wrote: I was a bit confused with the use of fixtures in Pytest with the dataframes passed as an input pipeline from one fixture to another. I wrote this after spending some time on it. As usual it is heuristic rather than anything overtly by the book so to speak.
In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to Hive etc. However, for PyTest, I decided to use MySql as a database of choice for testing with a small sample of data (200 rows). I mentioned Fixtures. Simply put "Fixtures are functions, which will run before each test function to which it is applied, to prepare data. Fixtures are used to feed some data to the tests such as database connections". If you have ordering like Read data (Extract), do something with it( Transform) and save it somewhere (Load), using Spark then these are all happening in memory with data frames feeding each other.
The crucial thing to remember is that fixtures pass functions to each other as parameters not by invoking them directly!
Example
## This is correct
@pytest.fixture(scope = "session")
def transformData(readSourceData): ## fixture passed as parameter
# this is incorrect (cannot call a fixture in another fixture)
read_df = readSourceData()
So This operation becomes
transformation_df = readSourceData. \
select( \
....
Say in PyCharm under tests package, you create a package "fixtures" (just a name nothing to do with "fixture") and in there you put your ETL python modules that prepare data for you. Example
### file --> saveData.py
@pytest.fixture(scope = "session")
def saveData(transformData):
# Write to test target table
try:
transformData. \
write. \
format("jdbc"). \
....
You then drive this test by creating a file called conftest.py under tests package. You can then instantiate your fixture files by referencing them in this file as below
import pytest
from tests.fixtures.extractHiveData import extractHiveData
from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable
from tests.fixtures.readSavedData import readSavedData
from tests.fixtures.readSourceData import readSourceData
from tests.fixtures.transformData import transformData
from tests.fixtures.saveData import saveData
from tests.fixtures.readSavedData import readSavedData
Then you have your test Python file say test_oracle.py under package tests and then put assertions there
import pytest
from src.config import ctest
@pytest.mark.usefixtures("extractHiveData")
def test_extract(extractHiveData):
assert extractHiveData.count() > 0
@pytest.mark.usefixtures("loadIntoMysqlTable")
def test_loadIntoMysqlTable(loadIntoMysqlTable):
assert loadIntoMysqlTable
@pytest.mark.usefixtures("readSavedData")
def test_readSourceData(readSourceData):
assert readSourceData.count() == ctest['statics']['read_df_rows']
@pytest.mark.usefixtures("transformData")
def test_transformData(transformData):
assert transformData.count() == ctest['statics']['transformation_df_rows']
@pytest.mark.usefixtures("saveData")
def test_saveData(saveData):
assert saveData
@pytest.mark.usefixtures("readSavedData") def test_readSavedData(transformData, readSavedData):
assert readSavedData.subtract(transformData).count() == 0
This is an illustration from PyCharm about directory structure unders tests

Let me know your thoughts.
Cheers,
Mich
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
--
--
|
|