Skip to content

Backends

lume_services.services.scheduling.backends.backend

Classes

RunConfig

Bases: BaseModel, ABC

Pydantic representation of Prefect UniversalRunConfig: https://docs.prefect.io/api/latest/run_configs.html#universalrun

Attributes:

Name Type Description
labels Optional[List[str]]

an list of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work

env Optional[dict]

Additional environment variables to set on the job

Attributes
labels class-attribute
labels: List[str] = ['lume-services']
env class-attribute
env: Optional[dict]
Functions
build abstractmethod
build() -> PrefectRunConfig

Method for converting object to Prefect RunConfig type.

Returns:

Type Description
PrefectRunConfig

PrefectRunConfig

Source code in lume_services/services/scheduling/backends/backend.py
24
25
26
27
28
29
30
31
32
@abstractmethod
def build(self) -> PrefectRunConfig:
    """Method for converting object to Prefect RunConfig type.

    Returns:
        PrefectRunConfig

    """
    ...

Backend

Bases: BaseModel, ABC

Abstract base class for Prefect backends. Backends handle Prefect interactions including running of flows, result handling, and flow registration with server backends.

Functions
create_project abstractmethod
create_project(project_name: str) -> None

Create a Prefect project. Backend implementations without server connecton should raise errors when this method is called.

Parameters:

Name Type Description Default
project_name str

Create a named Prefect project.

required
Source code in lume_services/services/scheduling/backends/backend.py
42
43
44
45
46
47
48
49
50
51
@abstractmethod
def create_project(self, project_name: str) -> None:
    """Create a Prefect project. Backend implementations without server connecton
    should raise errors when this method is called.

    Args:
        project_name (str): Create a named Prefect project.

    """
    ...
register_flow abstractmethod
register_flow(
    flow: Flow, project_name: str, image: Optional[str]
) -> str

Register a flow with Prefect. Backend implementations without server connecton should raise errors when this method is called.

Parameters:

Name Type Description Default
flow Flow

Prefect flow to register.

required
project_name str

Name of project to register flow to.

required
image str

Name of Docker image to run flow inside.

required

Returns:

Name Type Description
str str

ID of registered flow.

Source code in lume_services/services/scheduling/backends/backend.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@abstractmethod
def register_flow(
    self,
    flow: Flow,
    project_name: str,
    image: Optional[str],
) -> str:
    """Register a flow with Prefect. Backend implementations without server connecton
    should raise errors when this method is called.

    Args:
        flow (Flow): Prefect flow to register.
        project_name (str): Name of project to register flow to.
        image (str): Name of Docker image to run flow inside.

    Returns:
        str: ID of registered flow.

    """
    ...
load_flow abstractmethod
load_flow(flow_name: str, project_name: str) -> dict

Load a Prefect flow object. Backend implementations without server connecton should raise errors when this method is called.

Parameters:

Name Type Description Default
flow_name str

Name of flow.

required
project_name str

Name of project flow is registered with.

required

Returns:

Name Type Description
dict dict

Dictionary with keys "flow_id" and "flow"

Source code in lume_services/services/scheduling/backends/backend.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@abstractmethod
def load_flow(self, flow_name: str, project_name: str) -> dict:
    """Load a Prefect flow object. Backend implementations without server connecton
    should raise errors when this method is called.

    Args:
        flow_name (str): Name of flow.
        project_name (str): Name of project flow is registered with.

    Returns:
        dict: Dictionary with keys "flow_id" and "flow"

    """
    ...
run abstractmethod
run(
    parameters: Optional[Dict[str, Any]],
    run_config: Optional[RunConfig],
    **kwargs
) -> Union[str, None]

Run a flow. Does not return result. Implementations should cover instantiation of run_config from kwargs as well as backend-specific kwargs.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Any]]

Dictionary mapping flow parameter name to value

required
run_config Optional[RunConfig]

RunConfig object to configure flow fun.

required
**kwargs

Keyword arguments for RunConfig init and backend-specific execution.

{}

Returns:

Type Description
Union[str, None]

Union[str, None]: Return run_id in case of server backend, None in the case of local execution.

Raises:

Type Description
pydantic.ValidationError

Error validating run configuration.

ValueError

Value error on flow run

Source code in lume_services/services/scheduling/backends/backend.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@abstractmethod
def run(
    self,
    parameters: Optional[Dict[str, Any]],
    run_config: Optional[RunConfig],
    **kwargs
) -> Union[str, None]:
    """Run a flow. Does not return result. Implementations should cover instantiation
    of run_config from kwargs as well as backend-specific kwargs.

    Args:
        parameters (Optional[Dict[str, Any]]): Dictionary mapping flow parameter
            name to value
        run_config (Optional[RunConfig]): RunConfig object to configure flow fun.
        **kwargs: Keyword arguments for RunConfig init and backend-specific
            execution.

    Returns:
        Union[str, None]: Return run_id in case of server backend, None in the case
            of local execution.

    Raises:
        pydantic.ValidationError: Error validating run configuration.
        ValueError: Value error on flow run
    """
    ...
run_and_return abstractmethod
run_and_return(
    parameters: Optional[Dict[str, Any]],
    run_config: Optional[RunConfig],
    task_name: Optional[str],
    **kwargs
) -> Any

Run a flow and return result. Implementations should cover instantiation of run_config from kwargs as well as backend-specific kwargs.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Any]]

Dictionary mapping flow parameter name to value

required
run_config Optional[RunConfig]

RunConfig object to configure flow fun.

required
task_name Optional[str]

Name of task to return result. If no task slug is passed, will return the flow result.

required
**kwargs

Keyword arguments for RunConfig init and backend-specific execution.

{}

Returns:

Name Type Description
Any Any

Result of flow run.

Raises:

Type Description
lume_services.errors.EmptyResultError

No result is associated with the flow.

pydantic.ValidationError

Error validating run configuration.

ValueError

Value error on flow run

Source code in lume_services/services/scheduling/backends/backend.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@abstractmethod
def run_and_return(
    self,
    parameters: Optional[Dict[str, Any]],
    run_config: Optional[RunConfig],
    task_name: Optional[str],
    **kwargs
) -> Any:
    """Run a flow and return result. Implementations should cover instantiation of
    run_config from kwargs as well as backend-specific kwargs.

    Args:
        parameters (Optional[Dict[str, Any]]): Dictionary mapping flow parameter
            name to value
        run_config (Optional[RunConfig]): RunConfig object to configure flow fun.
        task_name (Optional[str]): Name of task to return result. If no task slug
            is passed, will return the flow result.
        **kwargs: Keyword arguments for RunConfig init and backend-specific
            execution.

    Returns:
        Any: Result of flow run.

    Raises:
        lume_services.errors.EmptyResultError: No result is associated with the
            flow.
        pydantic.ValidationError: Error validating run configuration.
        ValueError: Value error on flow run
    """
    ...

lume_services.services.scheduling.backends.local

Classes

LocalRunConfig

Bases: RunConfig

Local run configuration. If no directory is found at the filepath passed as working_dir, an error will be raised.

Attributes:

Name Type Description
labels Optional[List[str]]

an list of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work

env Optional[Dict[str, str]]

Dictionary of environment variables to use for run

working_dir Optional[str]

Working directory

Attributes
env class-attribute
env: Optional[Dict[str, str]]
working_dir class-attribute
working_dir: Optional[str] = str(os.getcwd())
Functions
validate
validate(v)

Pydantic validator checking working directory existence

Source code in lume_services/services/scheduling/backends/local.py
38
39
40
41
42
43
44
@validator("working_dir", pre=True)
def validate(cls, v):
    """Pydantic validator checking working directory existence"""
    if not os.path.isdir(v):
        raise FileNotFoundError("No directory found at %s", v)

    return v
build
build() -> LocalRun

Method for converting to Prefect RunConfig type LocalRun.

Returns:

Type Description
LocalRun

LocalRun

Source code in lume_services/services/scheduling/backends/local.py
46
47
48
49
50
51
52
53
def build(self) -> LocalRun:
    """Method for converting to Prefect RunConfig type LocalRun.

    Returns:
        LocalRun

    """
    return LocalRun(**self.dict(exclude_none=True))

LocalBackend

Bases: Backend

Backend used for local execution. This backend will raise errors on any function calls requiring registration with the Prefect server.

Attributes:

Name Type Description
run_config Optional[LocalRunConfig]

Default configuration object for a given run.

Functions
run
run(
    data: Dict[str, Any],
    run_config: LocalRunConfig = None,
    *,
    flow: Flow,
    **kwargs
) -> None

Run flow execution. Does not return result.

Parameters:

Name Type Description Default
labels Optional[List[str]]

an list of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work.

required
env Optional[dict]

Additional environment variables to set on the job

required
data Optional[Dict[str, Any]]

Dictionary mapping flow parameter name to value.

required
run_config Optional[LocalRunConfig]

LocalRunConfig object to configure flow fun.

None
flow Flow

Prefect flow to execute.

required
**kwargs

Keyword arguments to intantiate the LocalRunConfig.

{}

Raises:

Type Description
pydantic.ValidationError

Error validating run configuration.

Source code in lume_services/services/scheduling/backends/local.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def run(
    self,
    data: Dict[str, Any],
    run_config: LocalRunConfig = None,
    *,
    flow: Flow,
    **kwargs
) -> None:
    """Run flow execution. Does not return result.

    Args:
        labels (Optional[List[str]]): an list of labels to apply to this run
            config. Labels are string identifiers used by Prefect Agents for
            selecting valid flow runs when polling for work.
        env (Optional[dict]): Additional environment variables to set on the job
        data (Optional[Dict[str, Any]]): Dictionary mapping flow parameter name to
            value.
        run_config (Optional[LocalRunConfig]): LocalRunConfig object to configure
            flow fun.
        flow (Flow): Prefect flow to execute.
        **kwargs: Keyword arguments to intantiate the LocalRunConfig.

    Raises:
        pydantic.ValidationError: Error validating run configuration.

    """

    if run_config is not None and len(kwargs):
        warnings.warn(
            "Both run_config and kwargs passed to LocalBackend.run. Flow\
            will execute using passed run_config."
        )

    if run_config is None:
        run_config = LocalRunConfig(**kwargs)

    # convert to Prefect LocalRun
    prefect_run_config = run_config.build()

    # apply run config
    flow.run_config = prefect_run_config
    flow.run(parameters=data)
run_and_return
run_and_return(
    data: Dict[str, Any],
    run_config: LocalRunConfig = None,
    task_name: str = None,
    *,
    flow: Flow,
    **kwargs
) -> Any

Run flow execution and return result.

Parameters:

Name Type Description Default
data Optional[Dict[str, Any]]

Dictionary mapping flow parameter name to value.

required
run_config Optional[LocalRunConfig]

LocalRunConfig object to configure flow fun.

None
task_name Optional[str]

Name of task to return result. If no task slug is passed, will return the flow result.

None
flow Flow

Prefect flow to execute.

required
**kwargs

Keyword arguments to intantiate the LocalRunConfig.

{}

Raises:

Type Description
pydantic.ValidationError

Error validating run configuration.

EmptyResultError

No result is associated with the flow.

TaskNotCompletedError

Result reference task was not completed.

TaskNotInFlowError

Provided task slug not in flow.

Source code in lume_services/services/scheduling/backends/local.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def run_and_return(
    self,
    data: Dict[str, Any],
    run_config: LocalRunConfig = None,
    task_name: str = None,
    *,
    flow: Flow,
    **kwargs
) -> Any:
    """Run flow execution and return result.

    Args:
        data (Optional[Dict[str, Any]]): Dictionary mapping flow parameter name to
            value.
        run_config (Optional[LocalRunConfig]): LocalRunConfig object to configure
            flow fun.
        task_name (Optional[str]): Name of task to return result. If no task slug
            is passed, will return the flow result.
        flow (Flow): Prefect flow to execute.
        **kwargs: Keyword arguments to intantiate the LocalRunConfig.

    Raises:
        pydantic.ValidationError: Error validating run configuration.
        EmptyResultError: No result is associated with the flow.
        TaskNotCompletedError: Result reference task was not completed.
        TaskNotInFlowError: Provided task slug not in flow.

    """
    if run_config is not None and len(kwargs):
        warnings.warn(
            "Both run_config and kwargs passed to LocalBackend.run. Flow\
            will execute using passed run_config."
        )

    if run_config is None:
        run_config = LocalRunConfig(**kwargs)

    # convert to Prefect LocalRun
    prefect_run_config = run_config.build()

    # apply run config
    flow.run_config = prefect_run_config

    try:
        flow_run = flow.run(parameters=data)
        if flow_run.is_failed():
            logger.exception(flow_run.message)
            raise FlowFailedError(
                flow_id="local_flow",
                flow_run_id="local_flow_run",
                exception_message=flow_run.message,
            )

    except Exception as e:
        logger.exception(e.message)
        raise FlowFailedError(
            flow_id="local_flow",
            flow_run_id="local_flow_run",
            exception_message=e.message,
        )

    result = flow_run.result

    if result is None:
        raise EmptyResultError

    task_to_slug_map = {task: slug for task, slug in flow.slugs.items()}
    # slug_to_task_map = {slug: task for task, slug in flow.slugs.items()}

    # account for task slug
    if task_name is not None:
        # get tasks
        tasks = flow.get_tasks(name=task_name)
        if not len(tasks):
            raise TaskNotInFlowError(
                flow_name=flow.name, project_name="local", task_name=task_name
            )

        results = []
        for task in tasks:
            slug = task_to_slug_map.get(task)
            state = result[task]
            if not state.is_successful():
                raise TaskNotCompletedError(
                    slug, flow_id="local_flow", flow_run_id="local_flow_run"
                )

            res = state.result
            if res is None:
                raise EmptyResultError("local_flow", "local_flow_run", slug)

            results.append(state.result)

        if len(tasks) == 1:
            return results[0]

        else:
            return results

    # else return dict of task slug to value
    else:
        return {
            slug: result[task].result for task, slug in task_to_slug_map.items()
        }
create_project
create_project(*args, **kwargs) -> None

Raise LocalBackendError for calls to register_flow server-type method.

Raises:

Type Description
LocalBackendError

Indicates that a server-backend operation has been executed against the LocalBackend. Server-backend operations include flow registration and remote execution.

Source code in lume_services/services/scheduling/backends/local.py
214
215
216
217
218
219
220
221
222
223
def create_project(self, *args, **kwargs) -> None:
    """Raise LocalBackendError for calls to register_flow server-type method.

    Raises:
        LocalBackendError: Indicates that a server-backend operation has been
            executed against the LocalBackend. Server-backend operations include
            flow registration and remote execution.

    """
    raise LocalBackendError()
register_flow
register_flow(*args, **kwargs) -> None

Raise LocalBackendError for calls to register_flow server-type method.

Raises:

Type Description
LocalBackendError

Indicates that a server-backend operation has been executed against the LocalBackend. Server-backend operations include flow registration and remote execution.

Source code in lume_services/services/scheduling/backends/local.py
225
226
227
228
229
230
231
232
233
234
235
def register_flow(self, *args, **kwargs) -> None:
    """Raise LocalBackendError for calls to register_flow server-type method.

    Raises:
        LocalBackendError: Indicates that a server-backend operation has been
            executed against the LocalBackend. Server-backend operations include
            flow registration and remote execution.


    """
    raise LocalBackendError()
load_flow
load_flow(*args, **kwargs) -> None

Raise LocalBackendError for calls to load_flow server-type method.

Raises:

Type Description
LocalBackendError

Indicates that a server-backend operation has been executed against the LocalBackend. Server-backend operations include flow registration and remote execution.

Source code in lume_services/services/scheduling/backends/local.py
237
238
239
240
241
242
243
244
245
246
def load_flow(self, *args, **kwargs) -> None:
    """Raise LocalBackendError for calls to load_flow server-type method.

    Raises:
        LocalBackendError: Indicates that a server-backend operation has been
            executed against the LocalBackend. Server-backend operations include
            flow registration and remote execution.

    """
    raise LocalBackendError()

lume_services.services.scheduling.backends.server

Classes

PrefectAgentConfig

Bases: BaseModel

Attributes
host class-attribute
host: str = 'http://localhost'
host_port class-attribute
host_port: str = '5000'

PrefectServerConfig

Bases: BaseModel

Attributes
tag class-attribute
tag: str = 'core-1.2.4'
host class-attribute
host: str = 'http://localhost'
host_port class-attribute
host_port: str = '4200'
host_ip class-attribute
host_ip: str = '127.0.0.1'

PrefectUIConfig

Bases: BaseModel

Attributes
host class-attribute
host: str = 'http://localhost'
host_port class-attribute
host_port: str = '8080'
host_ip class-attribute
host_ip: str = '127.0.0.1'
apollo_url class-attribute
apollo_url: str = 'http://localhost:4200/graphql'

PrefectTelemetryConfig

Bases: BaseModel

Attributes
enabled class-attribute
enabled: bool = True

PrefectConfig

Bases: BaseModel

Attributes
server class-attribute
server: PrefectServerConfig = PrefectServerConfig()
ui class-attribute
ui: PrefectUIConfig = PrefectUIConfig()
telemetry class-attribute
telemetry: PrefectTelemetryConfig = PrefectTelemetryConfig()
agent class-attribute
agent: PrefectAgentConfig = PrefectAgentConfig()
home_dir class-attribute
home_dir: str = '~/.prefect'
debug class-attribute
debug: bool = False
backend class-attribute
backend: Literal['server', 'cloud'] = 'server'
Functions
apply
apply()
Source code in lume_services/services/scheduling/backends/server.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def apply(self):
    prefect_config.update(
        home_dir=self.home_dir, debug=self.debug, backend=self.backend
    )
    # must set endpoint because referenced by client
    prefect_config.server.update(
        endpoint=f"{self.server.host}:{self.server.host_port}", **self.server.dict()
    )
    prefect_config.server.ui.update(**self.ui.dict())
    prefect_config.server.telemetry.update(**self.telemetry.dict())
    # client requires api set
    prefect_config.cloud.update(api=f"{self.server.host}:{self.server.host_port}")
    save_backend(self.backend)
    return prefect_config

ServerBackend

Bases: Backend

Abstract backend used for connecting to a Prefect server.

Prefect manages its own contexts for the purpose of registering flow objects etc. This introduced issues with management of clients, namely that even after setting the prefect configuration in the PrefectConfig.apply method, the original cloud context was still being used to construct the client. For this reason, all clients are constructed inside a context constructed from the backend configuration.

Attributes:

Name Type Description
config PrefectConfig

Instantiated PrefectConfig object describing connection to Prefect server.

default_image str

Default image used for registering flow storage.

Attributes
config class-attribute
config: PrefectConfig
default_image class-attribute
default_image: str = Field(None, alias='image')
Functions
run_config_type
run_config_type() -> PrefectRunConfig

Abstract property that must return the Prefect RunConfig type pertinent to the Backend implementation.

Source code in lume_services/services/scheduling/backends/server.py
101
102
103
104
105
106
107
@abstractproperty
def run_config_type(self) -> PrefectRunConfig:
    """Abstract property that must return the Prefect RunConfig type pertinent to
    the Backend implementation.

    """
    ...
create_project
create_project(project_name: str) -> None

Create a Prefect project.

Parameters:

Name Type Description Default
project_name str

Create a named Prefect project.

required

Raises:

Type Description
prefect.errors.ClientError

if the GraphQL query is bad for any reason

Source code in lume_services/services/scheduling/backends/server.py
109
110
111
112
113
114
115
116
117
118
119
120
121
def create_project(self, project_name: str) -> None:
    """Create a Prefect project.

    Args:
        project_name (str): Create a named Prefect project.

    Raises:
        prefect.errors.ClientError: if the GraphQL query is bad for any reason

    """
    with prefect.context(config=self.config.apply()):
        client = Client()
        client.create_project(project_name=project_name)
register_flow
register_flow(
    flow: Flow,
    project_name: str,
    image: str = None,
    labels: List[str] = None,
    idempotency_key: str = None,
    version_group_id: str = None,
    build: bool = True,
    no_url: bool = False,
    set_schedule_active: bool = True,
) -> str

Register a flow with Prefect.

Parameters:

Name Type Description Default
flow Flow

Prefect flow to register

required
project_name str

Name of project to register flow to

required
image str

Name of Docker image to run flow inside. If not specified, this will use the default image packaged with this repository.

None
build bool

Whether the flows storage should be built prior to serialization. By default lume-services flows use the same image for execution with additional packages passed for installation configured at runtime.

True
labels Optional[List[str]]

A list of labels to add to this Flow.

None
idempotency_key Optional[str]

a key that, if matching the most recent registration call for this flow group, will prevent the creation of another flow version and return the existing flow id instead.

None
version_group_id Optional[str]

The UUID version group ID to use for versioning this Flow in Cloud. If not provided, the version group ID associated with this Flow's project and name will be used.

None
no_url Optional[bool]

If True, the stdout from this function will not contain the URL link to the newly-registered flow in the UI

False
set_schedule_active Optional[bool]

If False, will set the schedule to inactive in the database to prevent auto-scheduling runs (if the Flow has a schedule)

True

Returns:

Name Type Description
str str

ID of registered flow

Notes

prefect registration idempotency key omitted and version group...

Raises:

Type Description
prefect.errors.ClientError

if the GraphQL query is bad for any reason

Source code in lume_services/services/scheduling/backends/server.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def register_flow(
    self,
    flow: Flow,
    project_name: str,
    image: str = None,
    labels: List[str] = None,
    idempotency_key: str = None,
    version_group_id: str = None,
    build: bool = True,
    no_url: bool = False,
    set_schedule_active: bool = True,
) -> str:
    """Register a flow with Prefect.

    Args:
        flow (Flow): Prefect flow to register
        project_name (str): Name of project to register flow to
        image (str): Name of Docker image to run flow inside. If not specified,
            this will use the default image packaged with this repository.
        build (bool): Whether the flows storage should be built prior to
            serialization. By default lume-services flows use the same
            image for execution with additional packages passed for installation
            configured at runtime.
        labels (Optional[List[str]]): A list of labels to add to this Flow.
        idempotency_key (Optional[str]): a key that, if matching the most recent
            registration call for this flow group, will prevent the creation of
            another flow version and return the existing flow id instead.
        version_group_id (Optional[str]): The UUID version group ID to use for
            versioning this Flow in Cloud. If not provided, the version group ID
            associated with this Flow's project and name will be used.
        no_url (Optional[bool]): If True, the stdout from this function will not
            contain the URL link to the newly-registered flow in the UI
        set_schedule_active (Optional[bool]): If False, will set the schedule to
            inactive in the database to prevent auto-scheduling runs (if the Flow
            has a schedule)

    Returns:
        str: ID of registered flow

    Notes:
        prefect registration idempotency key omitted and version group...

    Raises:
        prefect.errors.ClientError: if the GraphQL query is bad for any reason

    """
    if not image:
        image = self.default_image

    # configure run config for backend
    run_config = self.run_config_type(image=image)
    flow.run_config = run_config.build()
    if labels is not None:
        logger.info(
            "Flow run config is not empty. Clearing existing labels and assigning \
                new."
        )
        flow.run_config.labels = set(labels)

    flow.run_config.image_tag = image

    with prefect.context(config=self.config.apply()):
        flow_id = flow.register(
            project_name=project_name,
            build=build,
            set_schedule_active=set_schedule_active,
            version_group_id=version_group_id,
            no_url=no_url,
            idempotency_key=idempotency_key,
        )

    return flow_id
load_flow
load_flow(flow_name: str, project_name: str) -> dict

Load a Prefect flow object.

Parameters:

Name Type Description Default
flow_name str

Name of flow.

required
project_name str

Name of project flow is registered with.

required

Returns:

Name Type Description
dict dict

Dictionary with keys "flow_id" and "flow"

Raises:

Type Description
prefect.errors.ClientError

if the GraphQL query is bad for any reason

Source code in lume_services/services/scheduling/backends/server.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def load_flow(self, flow_name: str, project_name: str) -> dict:
    """Load a Prefect flow object.

    Args:
        flow_name (str): Name of flow.
        project_name (str): Name of project flow is registered with.

    Returns:
        dict: Dictionary with keys "flow_id" and "flow"

    Raises:
        prefect.errors.ClientError: if the GraphQL query is bad for any reason

    """

    flow_view = FlowView.from_flow_name(
        flow_name, project_name=project_name, last_updated=True
    )
    with prefect.context(config=self.config.apply()):
        flow_view = FlowView.from_flow_name(
            flow_name, project_name=project_name, last_updated=True
        )
        return {"flow_id": flow_view.flow_id, "flow": flow_view.flow}
run
run(
    parameters: Dict[str, Any] = None,
    run_config: RunConfig = None,
    *,
    flow_id: str,
    **kwargs
) -> str

Create a flow run for a flow.

Parameters:

Name Type Description Default
flow_id str

Flow identifier

required
parameters Optional[Dict[str, Any]]

Dictionary mapping flow parameter name to value

None
run_config Optional[RunConfig]

RunConfig object to configure flow fun.

None
**kwargs

Keyword arguments to intantiate the RunConfig.

{}

Returns:

Name Type Description
str str

ID of flow run

Raises:

Type Description
prefect.errors.ClientError

if the GraphQL query is bad for any reason

docker.errors.DockerException

Run configuration error for docker api.

pydantic.ValidationError

Error validating run configuration.

ValueError

Value error on flow run

Source code in lume_services/services/scheduling/backends/server.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def run(
    self,
    parameters: Dict[str, Any] = None,
    run_config: RunConfig = None,
    *,
    flow_id: str,
    **kwargs,
) -> str:
    """Create a flow run for a flow.

    Args:
        flow_id (str): Flow identifier
        parameters (Optional[Dict[str, Any]]): Dictionary mapping flow parameter
            name to value
        run_config (Optional[RunConfig]): RunConfig object to configure flow fun.
        **kwargs: Keyword arguments to intantiate the RunConfig.

    Returns:
        str: ID of flow run

    Raises:
        prefect.errors.ClientError: if the GraphQL query is bad for any reason
        docker.errors.DockerException: Run configuration error for docker api.
        pydantic.ValidationError: Error validating run configuration.
        ValueError: Value error on flow run
    """
    if run_config is not None and len(kwargs):
        warnings.warn(
            "Both run_config and kwargs passed to Backend.run. Flow\
            will execute using passed run_config."
        )

    # convert LUME-services run config to appropriate Prefect RunConfig object
    if run_config is None:
        run_config = self.run_config_type(**kwargs)

    prefect_run_config = run_config.build()

    with prefect.context(config=self.config.apply()):
        client = Client()
        flow_run_id = client.create_flow_run(
            flow_id=flow_id, parameters=parameters, run_config=prefect_run_config
        )

    return flow_run_id
run_and_return
run_and_return(
    parameters: Dict[str, Any] = None,
    run_config: RunConfig = None,
    task_name: str = None,
    *,
    flow_id: str,
    timeout: timedelta = timedelta(minutes=1),
    cancel_on_timeout: bool = True,
    **kwargs
)

Create a flow run for a flow and return the result.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Any]]

Dictionary mapping flow parameter name to value

None
run_config Optional[RunConfig]

RunConfig object to configure flow fun.

None
task_name Optional[str]

Name of task to return result. If no task slug is passed, will return the flow result.

None
flow_id str

ID of flow to run.

required
timeout timedelta

Time before stopping flow execution.

timedelta(minutes=1)
cancel_on_timeout bool

Whether to cancel execution on timeout error.

True
**kwargs

Keyword arguments to intantiate the RunConfig.

{}

Raises:

Type Description
EmptyResultError

No result is associated with the flow.

TaskNotCompletedError

Result reference task was not completed.

RuntimeError

Flow did not complete within given timeout.

prefect.errors.ClientError

if the GraphQL query is bad for any reason

docker.errors.DockerException

Run configuration error for docker api.

pydantic.ValidationError

Error validating run configuration.

TaskNotInFlowError

Provided task slug not in flow.

ValueError

Value error on flow run

Source code in lume_services/services/scheduling/backends/server.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def run_and_return(
    self,
    parameters: Dict[str, Any] = None,
    run_config: RunConfig = None,
    task_name: str = None,
    *,
    flow_id: str,
    timeout: timedelta = timedelta(minutes=1),
    cancel_on_timeout: bool = True,
    **kwargs,
):
    """Create a flow run for a flow and return the result.

    Args:
        parameters (Optional[Dict[str, Any]]): Dictionary mapping flow parameter
            name to value
        run_config (Optional[RunConfig]): RunConfig object to configure flow fun.
        task_name (Optional[str]): Name of task to return result. If no task slug
            is passed, will return the flow result.
        flow_id (str): ID of flow to run.
        timeout (timedelta): Time before stopping flow execution.
        cancel_on_timeout (bool): Whether to cancel execution on timeout
            error.
        **kwargs: Keyword arguments to intantiate the RunConfig.

    Raises:
        EmptyResultError: No result is associated with the flow.
        TaskNotCompletedError: Result reference task was not completed.
        RuntimeError: Flow did not complete within given timeout.
        prefect.errors.ClientError: if the GraphQL query is bad for any reason
        docker.errors.DockerException: Run configuration error for docker api.
        pydantic.ValidationError: Error validating run configuration.
        TaskNotInFlowError: Provided task slug not in flow.
        ValueError: Value error on flow run
    """
    if run_config is not None and len(kwargs):
        warnings.warn(
            "Both run_config and kwargs passed to Backend.run. Flow\
            will execute using passed run_config."
        )

    # convert LUME-services run config to appropriate Prefect RunConfig object
    if run_config is None:
        run_config = self.run_config_type(**kwargs)

    prefect_run_config = run_config.build()

    logger.info(
        "Creating Prefect flow run for %s with parameters %s and run_config %s",
        flow_id,
        parameters,
        run_config.json(),
    )

    with prefect.context(config=self.config.apply()):
        client = Client()

        flow_run_id = client.create_flow_run(
            flow_id=flow_id, parameters=parameters, run_config=prefect_run_config
        )
        flow_view = FlowView.from_flow_id(flow_id)

        # watch flow run and stream logs until timeout
        try:
            for log in watch_flow_run(
                flow_run_id,
                stream_states=True,
                stream_logs=True,
                max_duration=timeout,
            ):
                logger.info(log)
        except RuntimeError as err:
            if cancel_on_timeout:
                client.cancel_flow_run(flow_run_id=flow_run_id)
            raise err

        logger.debug("Watched flow completed.")
        flow_run = FlowRunView.from_flow_run_id(flow_run_id)

        # check state
        if flow_run.state.is_failed():
            logger.exception(flow_run.state.message)
            raise FlowFailedError(
                flow_id=flow_run.flow_id,
                flow_run_id=flow_run.flow_run_id,
                exception_message=flow_run.state.message,
            )

        task_runs = flow_run.get_all_task_runs()

        # populate tasks
        results = {}
        for task_run in task_runs:
            slug = task_run.task_slug
            if not task_run.state.is_successful():
                raise TaskNotCompletedError(slug, flow_id, flow_run_id)

            try:
                res = task_run.get_result()
            # location is not set, no result
            except ValueError:
                res = None

            results[slug] = res

    # get task run
    if task_name is not None:
        # filter tasks based on name
        task_runs = {
            slug: res for slug, res in results.items() if task_name in slug
        }
        logger.debug(task_runs)

        if not len(task_runs):
            raise TaskNotInFlowError(
                flow_name=flow_view.name,
                project_name=flow_view.project_name,
                task_name=task_name,
            )

        if len(task_runs) == 1:
            res = list(task_runs.values())[0]
            if res is None:
                raise EmptyResultError(flow_id, flow_run_id, slug)

            return res

        else:
            return task_runs

    # assume flow result, return all results
    else:
        return results

lume_services.services.scheduling.backends.docker

Classes

DockerRunConfig

Bases: RunConfig

Pydantic representation of a Docker Prefect run configuration: https://docs.prefect.io/api/latest/run_configs.html#dockerrun

Attributes:

Name Type Description
labels Optional[List[str]]

an list of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work

env Optional[dict]

Additional environment variables to set on the job

image str

Tag of image in which flow should run.

host_config Optional[Dict[str, Any]]

Dictionary representing runtime args to be passed to Docker agent. Full documentation of args can be found here: https://docker-py.readthedocs.io/en/stable/api.html#docker.api.container.ContainerApiMixin.create_host_config

ports Optional[List[int]]

An list of ports numbers to expose on container.

Attributes
image class-attribute
image: str
host_config class-attribute
host_config: Dict[str, Any] = None
ports class-attribute
ports: Optional[List[int]]
Functions
validate_host_config
validate_host_config(v)

Composes a model for the Docker host configuration and applies any passed values.

Source code in lume_services/services/scheduling/backends/docker.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@validator("host_config", pre=True)
def validate_host_config(cls, v):
    """Composes a model for the Docker host configuration and applies any passed
    values.

    """
    if isinstance(v, (dict,)):
        # test host config composition using api version
        try:
            HostConfig(version=docker_api_version(), **v)
        except Exception as e:
            logger.exception(e)
            raise e

    return v
build
build() -> DockerRun

Method for converting to Prefect RunConfig type DockerRun.

Returns:

Type Description
DockerRun

DockerRun

Source code in lume_services/services/scheduling/backends/docker.py
54
55
56
57
58
59
60
61
def build(self) -> DockerRun:
    """Method for converting to Prefect RunConfig type DockerRun.

    Returns:
        DockerRun

    """
    return DockerRun(**self.dict(exclude_none=True))

DockerBackend

Bases: ServerBackend

Implementation of Backend used for interacting with prefect deployed in cluster of Docker containers, as with docker-compose.

Attributes:

Name Type Description
config PrefectConfig

Instantiated PrefectConfig object describing connection to Prefect server.

_client Client

Prefect client connection created on instantiation.

_run_config_type type

Type used to compose Prefect run configuration.

Functions
run_config_type property
run_config_type()
Source code in lume_services/services/scheduling/backends/docker.py
78
79
80
@property
def run_config_type(self):
    return self._run_config_type

Functions

lume_services.services.scheduling.backends.kubernetes

Attributes

KUBERNETES_REQUEST_SUFFIXES module-attribute

KUBERNETES_REQUEST_SUFFIXES = [
    "EB",
    "PB",
    "TB",
    "GB",
    "MB",
    "kB",
    "EiB",
    "PiB",
    "TiB",
    "GiB",
    "MiB",
    "KiB",
]

Classes

KubernetesRunConfig

Bases: RunConfig

Pydantic representation of args to: https://docs.prefect.io/api/latest/run_configs.html#kubernetesrun https://kubernetes.io/docs/concepts/configuration/overview/#container-images

Attributes:

Name Type Description
labels Optional[List[str]]

an list of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work

env Optional[dict]

Additional environment variables to set on the job

image Optional[str]

The image to use. Can also be specified via job template.

job_template_path Optional[str]

Path to a job template to use. If a local path (no file scheme, or a file/local scheme), the job template will be loaded on initialization and stored on the KubernetesRun object as the job_template field. Otherwise the job template will be loaded at runtime on the agent. Supported runtime file schemes include (s3, gcs, and agent (for paths local to the runtime agent)).

job_template Optional[str]

An in-memory job template to use.

cpu_limit Union[float, str]

The CPU limit to use for the job

cpu_request Union[float, str]

The CPU request to use for the job

memory_limit Optional[str]

The memory limit to use for the job

memory_request Optional[str]

The memory request to use for the job

service_account_name Optional[str]

A service account name to use for this job. If present, overrides any service account configured on the agent or in the job template.

image_pull_secrets Optional[list]

A list of image pull secrets to use for this job. If present, overrides any image pull secrets configured on the agent or in the job template.

image_pull_policy Optional[str]

The imagePullPolicy to use for the job.

Attributes
image class-attribute
image: Optional[str]
image_pull_secrets class-attribute
image_pull_secrets: Optional[List[str]]
job_template class-attribute
job_template: Optional[dict]
job_template_path class-attribute
job_template_path: Optional[str]
service_account_name class-attribute
service_account_name: Optional[str]
image_pull_policy class-attribute
image_pull_policy: Literal[
    "Always", "IfNotPresent", "Never"
] = "IfNotPresent"
cpu_limit class-attribute
cpu_limit: Union[float, str] = 1.0
cpu_request class-attribute
cpu_request: Union[float, str] = 0.5
memory_limit class-attribute
memory_limit: Union[str, int] = None
memory_request class-attribute
memory_request: Union[str, int] = None
Functions
validate_memory
validate_memory(v)

Validate w.r.t. Kubernetes resource formats: int, fixed-point number using quantity suffixes: E, P, T, G, M, k or power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki

Source code in lume_services/services/scheduling/backends/kubernetes.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@validator("memory_limit", "memory_request")
def validate_memory(cls, v):
    """Validate w.r.t. Kubernetes resource formats: int, fixed-point number using
    quantity suffixes: E, P, T, G, M, k or power-of-two equivalents: Ei, Pi,
    Ti, Gi, Mi, Ki

    """

    if isinstance(v, (int,)):
        return v

    elif isinstance(v, (str,)):

        acceptable = False

        # check substrings
        inclusions = [
            substring for substring in KUBERNETES_REQUEST_SUFFIXES if substring in v
        ]

        if len(inclusions):

            for inclusion in inclusions:

                try:
                    stripped = v.replace(inclusion, "")
                    _ = int(stripped)
                    acceptable = True

                except ValueError:
                    pass

        if not acceptable:
            logger.error("Kubernetes resource request invalid: %s", v)
            raise ValueError(f"Kubernetes resource request invalid: {v}")

    else:
        raise ValueError("Must provide string or int to request")

    return v
build
build() -> KubernetesRun

Method for converting to Prefect RunConfig type KubernetesRun.

Returns:

Type Description
KubernetesRun

KubernetesRun

Source code in lume_services/services/scheduling/backends/kubernetes.py
115
116
117
118
119
120
121
122
123
124
125
126
def build(self) -> KubernetesRun:
    """Method for converting to Prefect RunConfig type KubernetesRun.

    Returns:
        KubernetesRun

    """
    # if job template and job template path missing, use packaged template
    if self.job_template is None and self.job_template_path is None:
        self.job_template = KUBERNETES_JOB_TEMPLATE

    return KubernetesRun(**self.dict(exclude_none=True))

KubernetesBackend

Bases: ServerBackend

Implementation of Backend used for interacting with Prefect deployed in K8 cluster.

Attributes:

Name Type Description
config PrefectConfig

Instantiated PrefectConfig object describing connection to Prefect server.

_client Client

Prefect client connection created on instantiation.

_run_config_type type

Type used to compose run configuration.

Functions
run_config_type property
run_config_type()
Source code in lume_services/services/scheduling/backends/kubernetes.py
143
144
145
@property
def run_config_type(self):
    return self._run_config_type