Skip to content

Database

lume_services.services.models.db.db

Classes

ModelDBConfig

Bases: BaseModel

ModelDB

ModelDB(db_config: ModelDBConfig)

Bases: ABC

Source code in lume_services/services/models/db/db.py
17
18
19
@abstractmethod
def __init__(self, db_config: ModelDBConfig):
    ...
Functions
execute abstractmethod
execute(statement: Executable)

Generic execution method for statements.

Parameters:

Name Type Description Default
statement(Executable)

An executable statement (select, insert...)

required
Source code in lume_services/services/models/db/db.py
21
22
23
24
25
26
27
28
@abstractmethod
def execute(self, statement: Executable):
    """Generic execution method for statements.

    Args:
        statement(Executable): An executable statement (select, insert...)
    """
    ...
select abstractmethod
select(statement: Select)

Method for executing selection statements.

Parameters:

Name Type Description Default
statement Select

Executable selection statement.

required
Source code in lume_services/services/models/db/db.py
30
31
32
33
34
35
36
37
38
@abstractmethod
def select(self, statement: Select):
    """Method for executing selection statements.

    Args:
        statement (Select): Executable selection statement.

    """
    ...
insert abstractmethod
insert(statement: Insert)

Method for executing insert statements.

Parameters:

Name Type Description Default
statement Insert

Executable insert statement.

required
Source code in lume_services/services/models/db/db.py
40
41
42
43
44
45
46
47
48
@abstractmethod
def insert(self, statement: Insert):
    """Method for executing insert statements.

    Args:
        statement (Insert): Executable insert statement.

    """
    ...
insert_many abstractmethod
insert_many(table_row_obj: List[Insert])

Method accepting list of Insert statements. This is distinguished from the base insert method because many services will use context managment for the management of their sessions.

Parameters:

Name Type Description Default
List[Insert]

List of executable insert statements.

required
Source code in lume_services/services/models/db/db.py
50
51
52
53
54
55
56
57
58
59
60
@abstractmethod
def insert_many(self, table_row_obj: List[Insert]):
    """Method accepting list of Insert statements. This is distinguished from the
    base insert method because many services will use context managment for the
    management of their sessions.

    Args:
        List[Insert]: List of executable insert statements.

    """
    ...
from_config_init
from_config_init(*args, **kwargs)

Convenience function for initializing config and db.

Source code in lume_services/services/models/db/db.py
62
63
64
@abstractclassmethod
def from_config_init(cls, *args, **kwargs):
    """Convenience function for initializing config and db."""

lume_services.services.models.db.mysql

Classes

ConnectionConfig

Bases: BaseModel

Configuration for creating sqlalchemy engine.

Parameters:

Name Type Description Default
pool_size int

Number of connections to maintain in the connection pool. Establishing connections is expensive and maintaining multiple connections in a pool allows for availability.

required
pool_pre_ping bool required
Attributes
pool_size class-attribute
pool_size: Optional[int]
pool_pre_ping class-attribute
pool_pre_ping: bool = True

MySQLModelDBConfig

Bases: ModelDBConfig

Configuration for MySQL connection.

Parameters:

Name Type Description Default
host str required
port str required
user str required
password SecretStr required
database str required
connection ConnectionConfig

Configuration options for creating sqlalchemy engine.

required
Attributes
host class-attribute
host: str
port class-attribute
port: int
user class-attribute
user: str
password class-attribute
password: SecretStr = Field(exclude=True)
database class-attribute
database: str
connection class-attribute
connection: ConnectionConfig = ConnectionConfig()

MySQLModelDB

MySQLModelDB(config: MySQLModelDBConfig)

Bases: ModelDB

MySQL implementation of the DBService client, allowing for Model DB connections to MySQL model db.

Initialize MySQL client service.

Parameters:

Name Type Description Default
config MySQLModelDBConfig

MySQL connection config

required
Source code in lume_services/services/models/db/mysql.py
64
65
66
67
68
69
70
71
72
73
def __init__(self, config: MySQLModelDBConfig):
    """Initialize MySQL client service.

    Args:
        config (MySQLModelDBConfig): MySQL connection config

    """
    self.config = config

    self._create_engine()
Attributes
config instance-attribute
config = config
Functions
connection
connection() -> Connection

Context manager for operations. Will clean up connections on exit of scope.

Source code in lume_services/services/models/db/mysql.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
@contextmanager
def connection(self) -> Connection:
    """Context manager for operations. Will clean up connections on exit of
    scope.

    """

    self._check_mp()

    # get connection
    cxn = self._connection.get()

    if cxn is None:
        cxn = self._connect()
        cleanup = True

    else:
        cleanup = False

    try:
        yield cxn

    finally:
        if cleanup:
            cxn = self._connection.get()

            if cxn:
                cxn.close()
                self._connection.set(None)
session
session() -> Session

Establishes Session with active connection.

Note: Setting expire_on_commit to False allows us to access objects after session closing.

Source code in lume_services/services/models/db/mysql.py
145
146
147
148
149
150
151
152
153
154
155
156
157
def session(self) -> Session:
    """Establishes Session with active connection.

    Note: Setting expire_on_commit to False allows us to access objects
    after session closing.

    """
    logger.debug("MySQLModelDB creating session.")
    with self.connection():
        session = self._sessionmaker()
        logger.debug("MySQLModelDB session created.")
        session.expire_on_commit = False
        return session
execute
execute(sql) -> list

Execute sql inside a managed session.

Parameters:

Name Type Description Default
sql

Execute a query

required
Results

list: Results of query operation

Source code in lume_services/services/models/db/mysql.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def execute(self, sql) -> list:
    """Execute sql inside a managed session.

    Args:
        sql: Execute a query

    Results:
        list: Results of query operation

    """
    logger.info("MySQLModelDB executing: %s", str(sql))
    with self.session() as session:

        res = session.execute(sql)
        session.commit()

    logger.info("MySQLModelDB executed: %s", str(sql))

    return res
select
select(sql: Select) -> list

Execute sql query inside a managed session.

Parameters:

Name Type Description Default
sql Select

Execute a selection query

required
Results

list: Results of selection operation

Source code in lume_services/services/models/db/mysql.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def select(self, sql: Select) -> list:
    """Execute sql query inside a managed session.

    Args:
        sql: Execute a selection query

    Results:
        list: Results of selection operation

    """
    logger.info("MySQLModelDB selecting: %s", str(sql))
    with self.session() as session:

        res = session.execute(sql).scalars().all()
        session.commit()

    return res
insert
insert(sql: Insert)

Execute and insert operation inside a managed session.

Parameters:

Name Type Description Default
sql Insert

Sqlalchemy insert operation

required

Returns:

Type Description

Union[str, int]: primary key returned from insert operation

Source code in lume_services/services/models/db/mysql.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def insert(self, sql: Insert):
    """Execute and insert operation inside a managed session.

    Args:
        sql (Insert): Sqlalchemy insert operation

    Returns:
        Union[str, int]: primary key returned from insert operation

    """
    logger.info("MySQLModelDB inserting: %s", str(sql))
    with self.session() as session:

        res = session.execute(sql)
        session.commit()

    logger.info("Sucessfully executed: %s", str(sql))

    return res.inserted_primary_key
insert_many
insert_many(sql: List[Insert]) -> List[Union[str, int]]

Execute many inserts within a managed session.

Parameters:

Name Type Description Default
sql List[Insert]

Execute a sqlalchemy insert operation

required

Returns:

Type Description
List[Union[str, int]]

List[Union[str, int]]: List of primary keys returned from insert operation

Source code in lume_services/services/models/db/mysql.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def insert_many(self, sql: List[Insert]) -> List[Union[str, int]]:
    """Execute many inserts within a managed session.

    Args:
        sql (List[Insert]): Execute a sqlalchemy insert operation

    Returns:
        List[Union[str, int]]: List of primary keys returned from insert operation

    """
    logger.info(
        "MySQLModelDB inserting many: %s", [str(statement) for statement in sql]
    )
    with self.session() as session:

        results = []

        for stmt in sql:
            res = session.execute(stmt)
            results.append(res)

        session.commit()

    logger.info("Sucessfully executed: %s", [str(statement) for statement in sql])

    return [res.inserted_primary_key for res in results]
from_config_init classmethod
from_config_init(**kwargs)

Initialize database handler from MySQLModelDBConfig kwargs.

Source code in lume_services/services/models/db/mysql.py
244
245
246
247
248
@classmethod
def from_config_init(cls, **kwargs):
    """Initialize database handler from MySQLModelDBConfig kwargs."""
    config = MySQLModelDBConfig(**kwargs)
    return cls(config=config)

lume_services.services.models.db.schema

Attributes

Base module-attribute

Base = declarative_base()

Classes

Model

Bases: Base

Attributes
model_id class-attribute
model_id = Column(
    "model_id",
    Integer,
    primary_key=True,
    autoincrement=True,
)
created class-attribute
created = Column(
    "created",
    DateTime(timezone=True),
    server_default=func.now(),
)
author class-attribute
author = Column('author', String(255), nullable=False)
laboratory class-attribute
laboratory = Column(
    "laboratory", String(255), nullable=False
)
facility class-attribute
facility = Column('facility', String(255), nullable=False)
beampath class-attribute
beampath = Column('beampath', String(255), nullable=False)
description class-attribute
description = Column(
    "description", String(255), nullable=False
)
deployment class-attribute
deployment = relationship('Deployment', backref='model')

Deployment

Bases: Base

Attributes
deployment_id class-attribute
deployment_id = Column(
    "deployment_id",
    Integer,
    primary_key=True,
    autoincrement=True,
)
version class-attribute
version = Column('version', String(255), nullable=False)
deploy_date class-attribute
deploy_date = Column(
    "deploy_date",
    DateTime(timezone=True),
    server_default=func.now(),
)
asset_dir class-attribute
asset_dir = Column('asset_dir', String(255), nullable=True)
source class-attribute
source = Column('source', String(255), nullable=True)
sha_256 class-attribute
sha_256 = Column('sha256', String(255), nullable=False)
image class-attribute
image = Column('image', String(255), nullable=True)
is_live class-attribute
is_live = Column('is_live', Boolean, nullable=False)
model_id class-attribute
model_id = Column(
    "model_id",
    ForeignKey("model.model_id"),
    nullable=False,
    onupdate="cascade",
)
flow class-attribute
flow = relationship('Flow', backref='deployment')

Project

Bases: Base

Attributes
project_name class-attribute
project_name = Column(
    "project_name", String(255), primary_key=True
)
description class-attribute
description = Column(
    "description", String(255), nullable=False
)
flows class-attribute
flows = relationship('Flow', backref='project')

Flow

Bases: Base

Attributes
flow_id class-attribute
flow_id = Column(
    "flow_id", String(255), primary_key=True, nullable=False
)
flow_name class-attribute
flow_name = Column("flow_name", String(255), nullable=False)
project_name class-attribute
project_name = Column(
    "project_name",
    ForeignKey("project.project_name"),
    nullable=False,
    onupdate="cascade",
)
deployment_id class-attribute
deployment_id = Column(
    "deployment_id",
    ForeignKey("deployment.deployment_id"),
    nullable=False,
    onupdate="cascade",
)

FlowOfFlows

Bases: Base

Attributes
id class-attribute
id = Column(
    "_id", Integer, primary_key=True, autoincrement=True
)
parent_flow_id class-attribute
parent_flow_id = Column(
    "parent_flow_id",
    ForeignKey("flow.flow_id"),
    nullable=False,
    onupdate="cascade",
)
flow_id class-attribute
flow_id = Column(
    "flow_id",
    ForeignKey("flow.flow_id"),
    nullable=False,
    onupdate="cascade",
)
position class-attribute
position = Column('position', Integer, nullable=False)
parent class-attribute
parent = relationship(
    "Flow",
    foreign_keys="FlowOfFlows.parent_flow_id",
    backref="composing_flows",
    lazy="joined",
)
flow class-attribute
flow = relationship(
    "Flow",
    foreign_keys="FlowOfFlows.flow_id",
    lazy="joined",
)