Skip to content

Flows

lume_services.flows.flow

Classes

MappedParameter

Bases: BaseModel

There are three types of mapped parameters: file, db, and raw.

file: File parameters are file outputs that will be loaded in downstream flows. Downstream loading must use the packaged load_file task in lume_services.tasks.file.

db: Database results ...

raw: Raw values are passed from task output to parameter input.

Attr

parent_flow_name (str): Parent flow holding origin of mapped parameter. parent_task_name (str): Task whose result is mapped to the parameter. map_type (Literal["file", "db", "raw"]): Type of mapping describing the parameters.

Attributes
parent_flow_name class-attribute
parent_flow_name: str
parent_task_name class-attribute
parent_task_name: str
map_type class-attribute
map_type: Literal['file', 'db', 'raw'] = 'raw'

RawMappedParameter

Bases: MappedParameter

RawMappedParameters describe parameter mappings where the result of a task is used as the input to a parameter.

Attr

parent_flow_name (str): Parent flow holding origin of mapped parameter. parent_task_name (str): Task whose result is mapped to the parameter. map_type (Literal["file", "db", "raw"] = "raw"): The "raw" map type describes the one-to-one result to parameter map.

Attributes
map_type class-attribute
map_type: str = Field('raw', const=True)

FileMappedParameter

Bases: MappedParameter

FileMappedParameters describe files passed between different flows. Files are saved as json representations describing file type (and serialization) and filesystem information.

Attr

parent_flow_name (str): Parent flow holding origin of mapped parameter. parent_task_name (str): Task whose result is mapped to the parameter. map_type (Literal["file", "db", "raw"] = "file"): The "file" map type describes the

Attributes
map_type class-attribute
map_type: str = Field('file', const=True)

DBMappedParameter

Bases: MappedParameter

Attributes
map_type class-attribute
map_type: str = Field('db', const=True)
attribute class-attribute
attribute: str
attribute_index class-attribute
attribute_index: Optional[List[str]]

Flow

Bases: BaseModel

Interface to a workflow object.

Attributes:

Name Type Description
name str

Name of flow

flow_id Optional[str]

ID of flow as registered with Prefect. If running locally, this will be null.

project_name Optional[str]

Name of Prefect project with which the flow is registered. If running locally this will be null.

parameters Optional[Dict[str, Parameter]]

Dictionary of Prefect parameters associated with the flow.

mapped_parameters Optional[Dict[str, MappedParameter]]

Parameters to be collected from results of other flows.

task_slugs Optional[Dict[str, str]]

Slug of tasks associated with the Prefect flow.

labels List[str] = ["lume-services"]

List of labels to assign to flow when registering with Prefect backend. This label is used to assign agents that will manage deployment.

image str

Image inside which to run flow if deploying to remote backend.

Attributes
name class-attribute
name: str
flow_id class-attribute
flow_id: Optional[str]
project_name class-attribute
project_name: Optional[str]
prefect_flow class-attribute
prefect_flow: Optional[PrefectFlow]
parameters class-attribute
parameters: Optional[Dict[str, Parameter]]
mapped_parameters class-attribute
mapped_parameters: Optional[Dict[str, MappedParameter]]
task_slugs class-attribute
task_slugs: Optional[Dict[str, str]]
labels class-attribute
labels: List[str] = ['lume-services']
image class-attribute
image: str = 'build-test:latest'
Classes
Config
Attributes
arbitrary_types_allowed class-attribute
arbitrary_types_allowed = True
validate_assignment class-attribute
validate_assignment = True
Functions
validate_mapped_parameters
validate_mapped_parameters(v)
Source code in lume_services/flows/flow.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@validator("mapped_parameters", pre=True)
def validate_mapped_parameters(cls, v):

    if v is None:
        return v

    mapped_parameters = {}

    for param_name, param in v.items():
        # persist instantiated params
        if isinstance(param, (MappedParameter,)):
            mapped_parameters[param_name] = param

        elif isinstance(param, (dict,)):
            # default raw
            if not param.get("map_type"):
                mapped_parameters[param_name] = RawMappedParameter(**param)

            else:
                mapped_param_type = _get_mapped_parameter_type(param["map_type"])
                mapped_parameters[param_name] = mapped_param_type(**param)

        else:
            raise ValueError(
                "Mapped parameters must be passed as instantiated \
                MappedParameters or dictionary"
            )

    return mapped_parameters
load_flow
load_flow(
    scheduling_service: SchedulingService = Provide[
        Context.scheduling_service
    ],
) -> None

Loads Prefect flow artifact from the backend.

Parameters:

Name Type Description Default
scheduling_service SchedulingService

Scheduling service. If not provided, uses injected service.

Provide[Context.scheduling_service]
Source code in lume_services/flows/flow.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
@inject
def load_flow(
    self,
    scheduling_service: SchedulingService = Provide[Context.scheduling_service],
) -> None:
    """Loads Prefect flow artifact from the backend.

    Args:
        scheduling_service (SchedulingService): Scheduling service. If not
            provided, uses injected service.
    """
    flow_dict = scheduling_service.load_flow(self.name, self.project_name)

    flow = flow_dict["flow"]

    # assign attributes
    self.prefect_flow = flow
    self.task_slugs = {task.name: task.slug for task in flow.get_tasks()}
    self.parameters = {parameter.name: parameter for parameter in flow.parameters()}
    self.flow_id = flow_dict["flow_id"]
register
register(
    scheduling_service: SchedulingService = Provide[
        Context.scheduling_service
    ],
) -> str

Register flow with SchedulingService backend.

Parameters:

Name Type Description Default
scheduling_service SchedulingService

Scheduling service. If not provided, uses injected service.

Provide[Context.scheduling_service]

Returns:

Name Type Description
flow_id str

ID of registered flow.

Source code in lume_services/flows/flow.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
@inject
def register(
    self,
    scheduling_service: SchedulingService = Provide[Context.scheduling_service],
) -> str:
    """Register flow with SchedulingService backend.

    Args:
        scheduling_service (SchedulingService): Scheduling service. If not
            provided, uses injected service.

    Returns:
        flow_id (str): ID of registered flow.

    """

    if self.prefect_flow is None:
        # attempt loading
        self.load_flow()

    self.flow_id = scheduling_service.register_flow(
        self.prefect_flow, self.project_name, labels=self.labels, image=self.image
    )

    self.parameters = {
        parameter.name: parameter for parameter in self.prefect_flow.parameters()
    }
    self.task_slugs = {
        task.name: task.slug for task in self.prefect_flow.get_tasks()
    }

    return self.flow_id
run
run(
    parameters,
    run_config,
    scheduling_service: SchedulingService = Provide[
        Context.scheduling_service
    ],
)

Run the flow.

Source code in lume_services/flows/flow.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def run(
    self,
    parameters,
    run_config,
    scheduling_service: SchedulingService = Provide[Context.scheduling_service],
):
    """Run the flow."""
    if isinstance(scheduling_service.backend, (LocalBackend,)):
        if self.prefect_flow is None:
            self.load_flow()

        scheduling_service.run(
            parameters=parameters, run_config=run_config, flow=self.prefect_flow
        )

    elif isinstance(scheduling_service.backend, (ServerBackend,)):
        scheduling_service.run(
            parameters=parameters, run_config=run_config, flow_id=self.flow_id
        )
run_and_return
run_and_return(
    parameters,
    run_config,
    task_name: Optional[str],
    scheduling_service: SchedulingService = Provide[
        Context.scheduling_service
    ],
)

Run flow and return result. Result will reference either passed task name or the result of all tasks.

Source code in lume_services/flows/flow.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def run_and_return(
    self,
    parameters,
    run_config,
    task_name: Optional[str],
    scheduling_service: SchedulingService = Provide[Context.scheduling_service],
):
    """Run flow and return result. Result will reference either passed task name or
    the result of all tasks.

    """
    if isinstance(scheduling_service.backend, (LocalBackend,)):
        if self.prefect_flow is None:
            self.load_flow()

        scheduling_service.run_and_return(
            parameters=parameters,
            run_config=run_config,
            flow=self.prefect_flow,
            task_name=task_name,
        )

    elif isinstance(scheduling_service.backend, (ServerBackend,)):
        scheduling_service.run_and_return(
            parameters=parameters,
            run_config=run_config,
            flow_id=self.flow_id,
            task_name=task_name,
        )

FlowConfig

Bases: BaseModel

Attributes
image class-attribute
image: Optional[str]
env class-attribute
env: Optional[List[str]]

FlowRunConfig

Bases: BaseModel

Attributes
poll_interval class-attribute
poll_interval: timedelta = timedelta(seconds=10)
scheduled_start_time class-attribute
scheduled_start_time: Optional[datetime]
parameters class-attribute
parameters: Optional[Dict[str, Any]]
run_config class-attribute
run_config: Optional[RunConfig]
labels class-attribute
labels: Optional[List[str]]
run_name class-attribute
run_name: Optional[str]
Classes
Config
Attributes
arbitrary_types_allowed class-attribute
arbitrary_types_allowed = True

lume_services.flows.flow_of_flows

Classes

FlowOfFlows

Bases: Flow

Attributes
composing_flows class-attribute
composing_flows: dict
Classes
Config
Attributes
arbitrary_types_allowed class-attribute
arbitrary_types_allowed = True
Functions
validate
validate(values: dict)

Validate composing flow data against Prefect server.

Source code in lume_services/flows/flow_of_flows.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@root_validator(pre=True)
def validate(cls, values: dict):
    """Validate composing flow data against Prefect server."""
    flows = {}

    scheduling_service = None
    if "scheduling_service" in values:
        scheduling_service = values.pop("scheduling_service")

    # validate composing flow existence
    composing_flows = values.get("composing_flows")

    if isinstance(composing_flows, (dict,)):
        pass

    # iterate to create dict
    elif isinstance(composing_flows, (list,)):
        for flow in values["composing_flows"]:

            # compose flow objects
            flow_obj = Flow(
                name=flow["name"],
                project_name=flow["project_name"],
                mapped_parameters=flow.get("mapped_parameters"),
            )

            # load Prefect parameters
            if scheduling_service is not None:
                flow_obj.load_flow(scheduling_service=scheduling_service)
            else:
                flow_obj.load_flow()

            flows[flow["name"]] = flow_obj

    # validate flow parameters
    for flow_name, flow in flows.items():
        if flow.mapped_parameters is not None:
            for parameter_name, parameter in flow.mapped_parameters.items():

                # validate parameter is in flow spec
                parameter_obj = flow.parameters.get(parameter_name)
                if parameter_obj is None:
                    raise ParameterNotInFlowError(parameter_name, flow_name)

                # validate parent flow is included in listed flows
                parent_flow = flows.get(parameter.parent_flow_name)
                if parent_flow is None:
                    raise ParentFlowNotInFlowsError(
                        parameter.parent_flow_name, list(flows.keys())
                    )

                # validate task is in the parent flow
                task = parent_flow.task_slugs.get(parameter.parent_task_name)

                if task is None:
                    raise TaskNotInFlowError(
                        parameter.parent_flow_name, parameter.parent_task_name
                    )

    values["composing_flows"] = flows

    return values
compose
compose(
    image_name: str,
    image_tag: str = "latest",
    local: bool = False,
    scheduling_service: SchedulingService = Provide[
        Context.scheduling_service
    ],
) -> PrefectFlow

Compose Prefect flow from FlowOfFlows object. Uses base image assigned to the FlowOfFlows Object and builds a new Docker image containing the composite flow.

Parameters:

Name Type Description Default
image_name str

Name of generated image.

required
image_tag str

Tag of generated image.

'latest'
local bool=False

Whether to use local images for the base image.

False

Returns:

Type Description
PrefectFlow

PrefectFlow

Source code in lume_services/flows/flow_of_flows.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def compose(
    self,
    image_name: str,
    image_tag: str = "latest",
    local: bool = False,
    scheduling_service: SchedulingService = Provide[Context.scheduling_service],
) -> PrefectFlow:
    """Compose Prefect flow from FlowOfFlows object. Uses base image assigned to
    the FlowOfFlows Object and builds a new Docker image containing the composite
    flow.


    Args:
        image_name (str): Name of generated image.
        image_tag (str): Tag of generated image.
        local (bool=False): Whether to use local images for the base image.


    Returns:
        PrefectFlow

    """

    # compose flow of flows
    with PrefectFlow(
        self.name,
        storage=Docker(
            base_image=self.image,
            image_name=image_name,
            image_tag=image_tag,
            local_image=local,
        ),
    ) as composed_flow:

        flow_runs = {}
        flow_waits = {}
        params = {}

        for i, (flow_name, flow) in enumerate(self.composing_flows.items()):

            # begin by creating parameters for all flow parameters
            flow_params = {}
            for param_name, param in flow.parameters.items():

                # update name and slug
                param.name = f"{flow_name}-{param_name}"
                param.slug = f"{flow_name}-{param_name}"
                params[param.name] = param

                # use original param name for flow config
                flow_params[param_name] = param

            # set up entry task
            if i == 0:
                flow_run = create_flow_run(
                    flow_id=flow.flow_id,
                    parameters=flow_params,
                    labels=flow.labels,
                )

            # setup other tasks
            elif i > 0:

                # create references to parameters
                upstream_flows = set()
                if flow.mapped_parameters is not None:

                    # update flow_params with mapping
                    for param_name, mapped_param in flow.mapped_parameters.items():
                        task_slug = self.composing_flows[
                            mapped_param.parent_flow_name
                        ].task_slugs[mapped_param.parent_task_name]

                        task_run_result = get_task_run_result(
                            flow_runs[mapped_param.parent_flow_name], task_slug
                        )

                        # raw results and file results use their values directly
                        if mapped_param.map_type in ["raw", "file"]:
                            flow.prefect_flow.replace(
                                flow_params.pop(param_name), task_run_result
                            )

                        # handle database results
                        elif mapped_param.map_type == "db":
                            load_db_result = LoadDBResult()
                            db_result = load_db_result(
                                task_run_result,
                                mapped_param.attribute,
                                attribute_index=mapped_param.attribute_index,
                            )
                            flow.prefect_flow.replace(
                                flow_params.pop(param_name), db_result
                            )

                            # add db result parameters to the task and create edge
                            for param in load_db_result.parameters:
                                flow.prefect_flow.add_task(param)
                                flow.prefect_flow.add_edge(
                                    param, load_db_result, mapped=True
                                )

                        else:
                            # should never reach if instantiating MappedParameter
                            mapped_param_types = get_args(
                                MappedParameter.__fields__["map_type"].type_
                            )
                            raise ValueError(
                                f"Task type {mapped_param.map_type} not in task. \
                                    Allowed types: {mapped_param_types}."
                            )

                        # add flow to upstream
                        upstream_flows.add(mapped_param.parent_flow_name)

                    # add creation of flow run to flow
                    flow_run = create_flow_run(
                        flow_id=flow.flow_id,
                        parameters=flow_params,
                        labels=flow.labels,
                    )

                # configure upstreams if any
                for upstream in upstream_flows:
                    flow_run.set_upstream(flow_waits[upstream])

            flow_wait = wait_for_flow_run(flow_run, raise_final_state=True)
            flow_runs[flow_name] = flow_run
            flow_waits[flow_name] = flow_wait

    # validate flow of flows
    composed_flow.validate()

    # assign to obj
    self.prefect_flow = composed_flow
    self.image = f"{image_name}:{image_tag}"

    return composed_flow
compose_and_register
compose_and_register()

Compose flow and register with project.

Returns:

Name Type Description
str

Registered flow id

Source code in lume_services/flows/flow_of_flows.py
235
236
237
238
239
240
241
242
243
244
245
def compose_and_register(self):
    """Compose flow and register with project.

    Returns:
        str: Registered flow id

    """

    flow = self.compose()
    self.prefect_flow = flow
    return self.register(self.project_name)
from_yaml classmethod
from_yaml(
    yaml_obj,
    scheduling_service: SchedulingService = Provide[
        Context.scheduling_service
    ],
)
Source code in lume_services/flows/flow_of_flows.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
@classmethod
@inject
def from_yaml(
    cls,
    yaml_obj,
    scheduling_service: SchedulingService = Provide[Context.scheduling_service],
):
    if os.path.exists(yaml_obj):
        flow_of_flow_config = yaml.safe_load(open(yaml_obj))

    else:
        flow_of_flow_config = yaml_obj

    # now validate
    return cls(**flow_of_flow_config, scheduling_service=scheduling_service)