How to mock actors in Ray Data pipelines
When running a Ray Data pipeline, you may want to mock methods in a callable class during CI. This turns out to be a bit tricky, since under the hood Ray Data does some magic when serializing classes. In this post, I'll show a pattern that works.
Take the following simple Ray Data pipeline. It defines a CallableClass
that we want to patch so that it doesn't call mock_me
:
# patching_ray/common.py
def mock_me():
raise Exception("Patching unsuccessful >:C")
# patching_ray/actor1.py
import ray
from patching_ray.common import mock_me
class CallableClass:
def __init__(self):
self.attribute = mock_me()
def __call__(self, batch):
return batch
def main():
mock_me()
print("#" * 50)
print(f"Successfully mocked mock_me in main(): {type(mock_me)}")
print("#" * 50)
ds = ray.data.range(1000)
ds = ds.map_batches(CallableClass, concurrency=1)
df = ds.to_pandas()
print(df)
if __name__ == "__main__":
main()
The obvious thing to try is just mocking out mock_me
:
# tests/test_util.py
def mock_mock_me():
print("mock_me successfully patched in common.py")
# tests/test_actor1_option1.py
from unittest.mock import patch
from patching_ray.actor1 import main
from tests.test_util import mock_mock_me
@patch("patching_ray.actor1.mock_me", mock_mock_me)
def test_actor1_option_1():
main()
This successfully mocks the mock_me
function in the main()
scope, however it does not patch the mock_me
inside the CallableClass
actor.
Another approach is to patch the CallableClass.__init__
function:
from unittest.mock import patch
from patching_ray.actor1 import main
from tests.test_util import mock_mock_me
def mock_actor1_init(self, *args, **kwargs):
print("mock_me successfully patched CallableClass.__init__")
self.attribute = mock_mock_me()
@patch("patching_ray.actor1.mock_me", mock_mock_me)
@patch("patching_ray.actor1.CallableClass.__init__", mock_actor1_init)
def test_actor1_option_2():
main()
...however, this does not work either since Ray Data interferes with this. In fact, patching the entire CallableClass
doesn't work either.
Solutions
The most elegant solution is to pass in the function to be mocked to the class:
import ray
from patching_ray.common import mock_me
class CallableClass:
def __init__(self, loader):
self.attribute = loader()
def __call__(self, batch):
return batch
def main():
mock_me()
print("#" * 50)
print(f"Successfully mocked mock_me in main(): {type(mock_me)}")
print("#" * 50)
ds = ray.data.range(1000)
ds = ds.map_batches(CallableClass, concurrency=1, fn_constructor_kwargs={"loader": mock_me})
print(ds.to_pandas())
if __name__ == "__main__":
main()
This allows us to patch the mock_me
function in the outer scope, before we initialize the actor.
Another thing you could do is to have a switch statement for testing mode, something like mock_me(testing=True)
. This is not really Pythonic but it works.
This code is available in this repo with the directories and tests already set up.