Skip to content

Database

lume_services.services.results.db

Classes

ResultsDBConfig

Bases: BaseModel

ResultsDB

ResultsDB(db_config: ResultsDBConfig)

Bases: ABC

Implementation of the database.

Source code in lume_services/services/results/db.py
19
20
21
@abstractmethod
def __init__(self, db_config: ResultsDBConfig):
    ...
Functions
insert_one abstractmethod
insert_one(item: dict, **kwargs) -> str

Insert document into the database.

Parameters:

Name Type Description Default
item dict

Dictionary representation of item

required

Returns:

Name Type Description
str str

Inserted item id

Source code in lume_services/services/results/db.py
23
24
25
26
27
28
29
30
31
32
33
@abstractmethod
def insert_one(self, item: dict, **kwargs) -> str:
    """Insert document into the database.

    Args:
        item (dict): Dictionary representation of item

    Returns:
        str: Inserted item id

    """
insert_many abstractmethod
insert_many(items: List[dict], **kwargs) -> List[str]

Insert many documents into the database.

Parameters:

Name Type Description Default
items List[dict]

List of dictionary representations of items

required

Returns:

Type Description
List[str]

List[str]: List of interted ids

Source code in lume_services/services/results/db.py
35
36
37
38
39
40
41
42
43
44
45
@abstractmethod
def insert_many(self, items: List[dict], **kwargs) -> List[str]:
    """Insert many documents into the database.

    Args:
        items (List[dict]): List of dictionary representations of items

    Returns:
        List[str]: List of interted ids

    """
find abstractmethod
find(
    *, query: dict, fields: List[str] = None, **kwargs
) -> List[dict]

Find a document based on a query.

Parameters:

Name Type Description Default
query dict

fields to query on

required
fields List[str]

List of fields to return if any

None
**kwargs dict

DB implementation specific fields

{}

Returns:

Type Description
List[dict]

List[dict]: List of dict reps of found items.

Source code in lume_services/services/results/db.py
47
48
49
50
51
52
53
54
55
56
57
58
59
@abstractmethod
def find(self, *, query: dict, fields: List[str] = None, **kwargs) -> List[dict]:
    """Find a document based on a query.

    Args:
        query (dict): fields to query on
        fields (List[str]): List of fields to return if any
        **kwargs (dict): DB implementation specific fields

    Returns:
        List[dict]: List of dict reps of found items.

    """
find_all abstractmethod
find_all(**kwargs) -> List[dict]

Find all documents for a collection

Returns:

Type Description
List[dict]

List[dict]: List of result items represented as dict.

Source code in lume_services/services/results/db.py
61
62
63
64
65
66
67
@abstractmethod
def find_all(self, **kwargs) -> List[dict]:
    """Find all documents for a collection

    Returns:
        List[dict]: List of result items represented as dict.
    """
configure abstractmethod
configure(**kwargs) -> None

Configure the results db service.

Source code in lume_services/services/results/db.py
69
70
71
@abstractmethod
def configure(self, **kwargs) -> None:
    """Configure the results db service."""

lume_services.services.results.mongodb

Classes

MongodbResultsDBConfig

Bases: ResultsDBConfig

Configuration for connecting to Mongodb using the PyMongo driver.

Attr

database (Optional[str]): Database name used for storing results. host (str): Host name of mongodb service. username (str): Username string. password (SecretStr): Password stored as a Pydantic secret string. https://pydantic-docs.helpmanual.io/usage/types/#secret-types port (int): Host port of mongodb service endpoint. authMechanism (): Auth mechanism supported by PyMongo driver. See https://pymongo.readthedocs.io/en/stable/api/pymongo/database.html#pymongo.auth.MECHANISMS. options (dict): Dictionary of additional connection options for MongoClient. https://pymongo.readthedocs.io/en/stable/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient

Attributes
database class-attribute
database: Optional[str] = Field(exclude=True)
username class-attribute
username: str
host class-attribute
host: str
password class-attribute
password: SecretStr = Field(exclude=True)
port class-attribute
port: int
authMechanism class-attribute
authMechanism: str = 'DEFAULT'
options class-attribute
options: dict = Field({}, exclude=True)
Classes
Config
Attributes
allow_population_by_field_name class-attribute
allow_population_by_field_name = True

MongodbCollection

Bases: BaseModel

Attributes
database class-attribute
database: str
name class-attribute
name: str
indices class-attribute
indices: dict

MongodbResultsDB

MongodbResultsDB(
    db_config: MongodbResultsDBConfig, connect: bool = True
)

Bases: ResultsDB

Source code in lume_services/services/results/mongodb.py
62
63
64
65
66
67
68
69
70
def __init__(self, db_config: MongodbResultsDBConfig, connect: bool = True):
    self.config = db_config

    # track pid to make multiprocessing safe
    self._pid = os.getpid()
    self._client = ContextVar("client", default=None)
    self._collections = ContextVar("collections", default={})
    if connect:
        self._connect()
Attributes
config instance-attribute
config = db_config
Functions
disconnect
disconnect()

Disconnect mongodb connection.

Source code in lume_services/services/results/mongodb.py
120
121
122
def disconnect(self):
    """Disconnect mongodb connection."""
    self._disconnect()
client
client() -> MongoClient

Context manager for mongoclient. Will check for multiprocessing and restart accordingly.

Source code in lume_services/services/results/mongodb.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@contextmanager
def client(self) -> MongoClient:
    """Context manager for mongoclient. Will check for multiprocessing and restart
    accordingly.

    """
    self._check_mp()

    # get connection
    client = self._client.get()

    if client is None:
        client = self._connect()
        cleanup = True

    else:
        cleanup = False

    try:
        yield client

    finally:
        if cleanup:
            client = self._client.get()

            if client:
                self._disconnect()
insert_one
insert_one(collection: str, **kwargs) -> str

Insert one document into the database.

Parameters:

Name Type Description Default
collection str

Name of collection for saving document

required
**kwargs

Kwargs contain representation of document

{}

Returns:

Name Type Description
str str

saved document id

Source code in lume_services/services/results/mongodb.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def insert_one(self, collection: str, **kwargs) -> str:
    """Insert one document into the database.

    Args:
        collection (str): Name of collection for saving document
        **kwargs: Kwargs contain representation of document

    Returns:
        str: saved document id

    """
    with self.client() as client:
        db = client[self.config.database]
        inserted_id = db[collection].insert_one(kwargs).inserted_id

    return inserted_id
insert_many
insert_many(
    collection: str, items: List[dict]
) -> List[str]

Insert many documents into the database.

Parameters:

Name Type Description Default
collection str

Document type to query

required
items List[dict]

List of dictionary reps of documents to save to database

required

Returns:

Type Description
List[str]

List[str]: List of saved document ids.

Source code in lume_services/services/results/mongodb.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def insert_many(self, collection: str, items: List[dict]) -> List[str]:
    """Insert many documents into the database.

    Args:
        collection (str): Document type to query
        items (List[dict]): List of dictionary reps of documents to save to database

    Returns:
        List[str]: List of saved document ids.

    """
    with self.client() as client:
        db = client[self.config.database]
        inserted_ids = db[collection].insert_many(items).inserted_ids

    return [inserted_id.str for inserted_id in inserted_ids]
find
find(
    collection: str,
    query: dict = None,
    fields: List[str] = None,
) -> List[dict]

Find a document based on a query.

Parameters:

Name Type Description Default
collection str

Document type to query

required
query dict

Query in dictionary form mapping fields to values

None
fields List[str]

List of fields for filtering result

None

Returns:

Type Description
List[dict]

List[dict]: List of of saved document ids.

Source code in lume_services/services/results/mongodb.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def find(
    self, collection: str, query: dict = None, fields: List[str] = None
) -> List[dict]:
    """Find a document based on a query.

    Args:
        collection (str): Document type to query
        query (dict): Query in dictionary form mapping fields to values
        fields (List[str]): List of fields for filtering result

    Returns:
        List[dict]: List of of saved document ids.

    """

    with self.client() as client:
        db = client[self.config.database]
        if fields is None:
            results = db[collection].find(query)

        else:
            results = db[collection].find(query, projection=fields)

    return list(results)
find_all
find_all(collection: str) -> List[dict]

Find all documents for a collection

Parameters:

Name Type Description Default
collection str

Collection name to query

required

Returns:

Type Description
List[dict]

List[dict]: List of result documents.

Source code in lume_services/services/results/mongodb.py
211
212
213
214
215
216
217
218
219
220
221
def find_all(self, collection: str) -> List[dict]:
    """Find all documents for a collection

    Args:
        collection (str): Collection name to query

    Returns:
        List[dict]: List of result documents.

    """
    return self.find(collection=collection)
configure
configure(collections: Dict[str, List[str]]) -> None

Configure the results database from collections and their indices.

Parameters:

Name Type Description Default
collections Dict[str, List[str]]

Dictionary mapping collection to index rep.

required
Source code in lume_services/services/results/mongodb.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def configure(self, collections: Dict[str, List[str]]) -> None:
    """Configure the results database from collections and their indices.

    Args:
        collections (Dict[str, List[str]]): Dictionary mapping collection to
            index rep.

    """

    with self.client() as client:
        db = client[self.config.database]

        for collection_name, index in collections.items():

            formatted_index = [(idx, DESCENDING) for idx in index]
            db[collection_name].create_index(formatted_index, unique=True)

    for collection_name in collections:
        index_info = db[collection_name].index_information()

        collections[collection_name] = MongodbCollection(
            database=self.config.database, name=collection_name, indices=index_info
        )

    self._collections.set(collections)