Skip to content

Modules

curry.block

Block

Bases: BaseModel

A block is a unit of computation in a workflow. It can be connected to other blocks and produce outputs in different formats using producers.

ATTRIBUTE DESCRIPTION
id

The unique identifier of the block.

TYPE: str

name

The name of the block.

TYPE: str

description

The description of the block.

TYPE: str

method_id

The method ID associated with the block.

TYPE: str

parameters

The parameters of the block.

TYPE: AnyDict

connections

The connections of the block.

TYPE: list[BlockConnection]

producers

The producers of the block

TYPE: dict[str, BlockProducer]

model_config class-attribute instance-attribute

model_config = ConfigDict(strict=True, extra='forbid')

id class-attribute instance-attribute

id = Field(default_factory=lambda: str(uuid4()))

method_id instance-attribute

method_id

name class-attribute instance-attribute

name = None

description class-attribute instance-attribute

description = None

parameters class-attribute instance-attribute

parameters = {}

connections class-attribute instance-attribute

connections = []

producers class-attribute instance-attribute

producers = {'html': BlockProducer(format_name='html', func=block_as_html_default), 'python_source': BlockProducer(format_name='python_source', description='Get the source code of the producer python function', func=block_method_source_code_default)}

has_producer

has_producer(format_name)

Check if the block has a producer for the given format name.

PARAMETER DESCRIPTION
format_name

TYPE: str

PARAMETER DESCRIPTION
format_name

The name of the format to check.

TYPE: str

RETURNS DESCRIPTION
bool

True if the block has a producer for the given format name, False otherwise.

TYPE: bool

Source code in curry/block/models.py
72
73
74
75
76
77
78
79
80
81
82
def has_producer(self, format_name: str) -> bool:
    """
    Check if the block has a producer for the given format name.

    Parameters:
        format_name (str): The name of the format to check.

    Returns:
        bool: True if the block has a producer for the given format name, False otherwise.
    """
    return format_name in self.producers

produce

produce(format_name)
PARAMETER DESCRIPTION
format_name

TYPE: str

Source code in curry/block/models.py
84
85
def produce(self, format_name: str) -> typing.Any:
    return self.producers[format_name].func(self)

available_producers

available_producers()

Returns a list of available producers.

RETURNS DESCRIPTION
list[str]

list[str]: A list of available producers.

Source code in curry/block/models.py
87
88
89
90
91
92
93
94
def available_producers(self) -> list[str]:
    """
    Returns a list of available producers.

    Returns:
        list[str]: A list of available producers.
    """
    return list(self.producers.keys())

register_producer

register_producer(producer)
PARAMETER DESCRIPTION
producer

TYPE: BlockProducer

Source code in curry/block/models.py
96
97
98
99
def register_producer(self, producer: BlockProducer) -> None:
    # if producer.format_name in self.producers:
    #     raise ProducerAlreadyRegistered(producer.format_name)
    self.producers[producer.format_name] = producer

from_func classmethod

from_func(func, **kwargs)

Create a Block instance from a given function.

PARAMETER DESCRIPTION
func

TYPE: AnyCallable

**kwargs

TYPE: Any DEFAULT: {}

PARAMETER DESCRIPTION
cls

The class of the Block instance.

TYPE: type

func

The function to create the Block from.

TYPE: AnyCallable

**kwargs

Additional keyword arguments for the Block instance.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Block

The created Block instance.

TYPE: Block

Source code in curry/block/models.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@classmethod
def from_func(cls, func: AnyCallable, **kwargs: typing.Any) -> "Block":
    """
    Create a Block instance from a given function.

    Args:
        cls (type): The class of the Block instance.
        func (AnyCallable): The function to create the Block from.
        **kwargs (typing.Any): Additional keyword arguments for the Block instance.

    Returns:
        Block: The created Block instance.

    """

    block_name = kwargs.get("name") or func.__name__
    kwargs["name"] = block_name

    block_description = kwargs.get("description") or func.__doc__ or ""
    kwargs["description"] = block_description

    block_method_id = kwargs.get("method_id") or func.__name__
    kwargs["method_id"] = block_method_id

    block = cls(**kwargs)

    return block

BlockConnection

Bases: BaseModel

source_block_id instance-attribute

source_block_id

source_output_name class-attribute instance-attribute

source_output_name = 'output'

self_input_name instance-attribute

self_input_name

BlockProducer

Bases: BaseModel

format_name instance-attribute

format_name

func instance-attribute

func

description class-attribute instance-attribute

description = None

tags class-attribute instance-attribute

tags = []

title class-attribute instance-attribute

title = None

model_config class-attribute instance-attribute

model_config = ConfigDict(arbitrary_types_allowed=True)

curry.flow

submit_workflow

submit_workflow(blocks, execute=True, render=False, render_format='png', render_filename=None)
PARAMETER DESCRIPTION
blocks

TYPE: list[Block]

execute

TYPE: bool DEFAULT: True

render

TYPE: bool DEFAULT: False

render_format

TYPE: Optional[str] DEFAULT: 'png'

render_filename

TYPE: Optional[str] DEFAULT: None

Source code in curry/flow/workflow.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def submit_workflow(
    blocks: list[Block],
    execute: bool = True,
    render: bool = False,
    render_format: typing.Optional[str] = "png",
    render_filename: typing.Optional[str] = None,
) -> AnyDict:
    task_dict: dict[str, Delayed] = {}

    # Iterate through the blocks of the workflow
    for block in blocks:
        block_id = block.id
        block_method_id = block.method_id or "no_method"

        print("\t- Processing block", block_id, "with method", block_method_id)

        # Retrieve the function corresponding to the block type
        block_method_info = MethodManager.get_method_info(block_method_id)
        block_method = block_method_info.method

        # Merge block parameters with block connections
        block_parameters = {}
        if len(block.connections) > 0 or len(block.parameters) > 0:
            block_parameters: AnyDict = {
                **block.parameters,
                **{
                    connection.self_input_name: task_dict[connection.source_block_id]
                    for connection in block.connections
                },
            }
            print("Block parameters for block", block_id, ":", block_parameters)

        # Create the Dask task for the block
        task_dict[block_id] = delayed(block_method)(**block_parameters)

    # Retrieve the final block
    final_task: Delayed = task_dict[list(task_dict.keys())[-1]]

    # Execute the Dask workflow
    result: typing.Optional[typing.Any] = None  # type: ignore  # noqa: PGH003
    if execute:
        # result = final_task.compute()
        result = final_task.compute()  # type: ignore  # noqa: PGH003

    render_result: typing.Any = None
    if render:
        render_result = final_task.visualize(format=render_format, filename=render_filename)  # type: ignore  # noqa: PGH003

    return {
        "result": result,
        "render_result": render_result,
    }

curry.schedulers.dask

slurm_cluster module-attribute

slurm_cluster = SLURMCluster()

curry.server

app module-attribute

app = FastAPI()