Skip to content

Producer Mode

Description

This unit describes the functionality for the producer mode. When running we expect a single instance of the producer to run.

Public Interfaces

Faktory Producer

The Faktory producer interface serves as the primary entry point for producer mode. The interface implements the job production state machine found in the README.

Private interface

Process Stencil

Stencils consist of an ordered pair \((j,k)\), with \(j+k=\text{TCN}\) the target TCN (tree crossing number). The stencils are used to produce jobs (job structure as described in the README) by pagination of an input collection of tangles.

State machine
stateDiagram-v2
    state "Get stencil" as gs
    state "Find rootstock pages" as frp
    state "Find scion pages" as fsp
    state "Mark stencil done" as msd


    [*]--> gs
    gs --> frp
    frp --> fsp
    fsp --> forp

    forp: For each rootstock page
    state forp{
        fosp: For each scion page
        state fosp{
            state "Build jobs" as bj
            state "Enqueue jobs" as ej
        [*]-->bj
        bj -->ej
        ej --> [*]
        }
        [*]-->fosp
        fosp-->[*]
    }
    forp --> msd
    msd --> [*]

Unit test description

Process Stencil

We will unit test the _process_stencil function but skip other interfaces due to lack of mock tooling.

Positive Tests

Valid Stencil

A valid stencil and collection is configured. Correct output jobs are created.

Inputs:

  • Mocked MongoDB collection
  • Page size set to two

Expected Output:

  • Jobs generated

Negative tests

<

Empty collection

An empty collection is configured.

Inputs:

  • Mocked empty MongoDB collection

Expected Output:

  • Exception is thrown

Implementation

runner.fproducer.fproducer

The faktory client core functionality.

_open_stencil_filter

_open_stencil_filter(tcn)

Generate a mongo filter for open stencils.

Parameters:

Name Type Description Default
tcn int

The TCN to filter for

required

Returns:

Type Description
dict

A mongo filter

Source code in runner/fproducer/fproducer.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def _open_stencil_filter(tcn: int) -> dict:
    """Generate a mongo filter for open stencils.

    Args:
        tcn: The TCN to filter for

    Returns:
        A mongo filter
    """
    return {
        '$and': [
            {'state': {'$ne': odm.StencilStateEnum.complete}},
            {'state': {'$ne': odm.StencilStateEnum.no_headroom}},
            {'TCN': {'$lte': tcn}},
            {'rootstock_tcn': {'$ne': 0}},
            {'_id': {'$ne': 'config'}},
        ]
    }

_get_stencil_config

_get_stencil_config(stencil_col)

Get the stencil configuration document.

Parameters:

Name Type Description Default
stencil_col Collection

The stencil collection.

required

Returns:

Type Description
StencilCfg

The configuration document for the given collection.

Source code in runner/fproducer/fproducer.py
34
35
36
37
38
39
40
41
42
43
44
def _get_stencil_config(stencil_col: Collection) -> odm.StencilCfg:
    """Get the stencil configuration document.

    Args:
        stencil_col: The stencil collection.

    Returns:
        The configuration document for the given collection.
    """
    stencil = stencil_col.find_one({'_id': 'config'})
    return from_dict(data_class=odm.StencilCfg, data=stencil)

_paginate_filter

_paginate_filter(arbor_col, mongo_filter)

Build a list of page cursors for a mongodb filter.

Parameters:

Name Type Description Default
arbor_col Collection

The collection of arborescent tangles.

required
mongo_filter dict

The filter to paginate

required

Returns:

Type Description
list[str]

A list of ID corresponding to the start of pages.

Source code in runner/fproducer/fproducer.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def _paginate_filter(arbor_col: Collection, mongo_filter: dict) -> list[str]:
    """Build a list of page cursors for a mongodb filter.

    Args:
        arbor_col: The collection of arborescent tangles.
        mongo_filter: The filter to paginate

    Returns:
        A list of ID corresponding to the start of pages.

    """
    page_list = []
    cursor = (arbor_col.find_one(mongo_filter, projection={'_id': 1}, sort={'_id': 1}))['_id']
    page_list.append(cursor)
    while tangdb := (
        arbor_col.find_one(
            {'$and': [mongo_filter, {'_id': {'$gte': cursor}}]},
            projection={'_id': 1},
            sort={'_id': 1},
            skip=int(cfg.cfg_dict['tangle-collections']['page_size']),
        )
    ):
        cursor = tangdb['_id']
        page_list.append(tangdb['_id'])
    return page_list

_process_stencil

_process_stencil(stencil_col, arbor_col, stencil)

Process a given stencil into jobs and push to faktory.

Parameters:

Name Type Description Default
stencil_col Collection

The colection of stencils.

required
arbor_col Collection

The collection of arborescent tangles.

required
stencil StencilDB

The stencil to process.

required
Source code in runner/fproducer/fproducer.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def _process_stencil(
    stencil_col: Collection, arbor_col: Collection, stencil: odm.StencilDB
) -> Generator[Job, None, None]:
    """Process a given stencil into jobs and push to faktory.

    Args:
        stencil_col: The colection of stencils.
        arbor_col: The collection of arborescent tangles.
        stencil: The stencil to process.
    """
    stencil_col.update_one({'_id': stencil._id}, {'$set': {'state': odm.StencilStateEnum.started}})

    tasks = [
        _paginate_filter(arbor_col, {'TCN': stencil.rootstock_tcn}),
        _paginate_filter(arbor_col, {'TCN': stencil.scion_tcn, 'is_good': True}),
    ]

    for root_idx in tasks[0]:
        for scion_idx in tasks[1]:
            job = Job(
                jobtype='arbor_job',
                args=[
                    str(root_idx),
                    stencil.rootstock_tcn,
                    str(scion_idx),
                    stencil.scion_tcn,
                    int(cfg.cfg_dict['tangle-collections']['page_size']),
                ],
                queue='arborescent',
            )
            yield job
    stencil_col.update_one({'_id': stencil._id}, {'$set': {'state': odm.StencilStateEnum.complete}})

_recovery

_recovery(arbor_col, stencil_col, stencil_cfg, producer)

On start recover previous state from stencil collection.

Parameters:

Name Type Description Default
arbor_col Collection

The collection of arborescent tangles.

required
stencil_col Collection

The collection of stencils.

required
stencil_cfg StencilCfg

The configuration document of the stencil collection.

required
producer Producer

The faktory producer.

required
Source code in runner/fproducer/fproducer.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def _recovery(
    arbor_col: Collection, stencil_col: Collection, stencil_cfg: odm.StencilCfg, producer: Producer
) -> None:
    """On start recover previous state from stencil collection.

    Args:
        arbor_col: The collection of arborescent tangles.
        stencil_col: The collection of stencils.
        stencil_cfg: The configuration document of the stencil collection.
        producer: The faktory producer.
    """
    for stencildb in stencil_col.find(
        {
            '$and': [
                {
                    '$or': [
                        {'state': odm.StencilStateEnum.new},
                        {'state': odm.StencilStateEnum.started},
                    ]
                },
                {'TCN': stencil_cfg.current_completed_tcn + 1},
            ]
        }
    ):
        stencil = from_dict(data_class=odm.StencilDB, data=stencildb)
        for job in _process_stencil(stencil_col, arbor_col, stencil):
            producer.push(job)

faktory_producer

faktory_producer()

Pyfaktory producer.

Source code in runner/fproducer/fproducer.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def faktory_producer() -> None:
    """Pyfaktory producer."""
    with Client(
        faktory_url=f'tcp://{cfg.cfg_dict["faktory-connection-info"]["domain"]}:{cfg.cfg_dict["faktory-connection-info"]["port"]}'
    ) as client:
        db_cfg = cfg.cfg_dict['db-connection-info']
        dbc = odm.get_db(
            db_cfg['domain'],
            db_cfg['port'],
            db_cfg['user'],
            db_cfg['password'],
            db_cfg['database'],
        )
        stencil_col = odm.get_stencil_collection(dbc)
        arbor_col = odm.get_arborescent_collection(dbc)
        stencil_cfg = _get_stencil_config(stencil_col)
        producer = Producer(client=client)
        _recovery(arbor_col, stencil_col, stencil_cfg, producer)
        while True:
            server_info = (client.info())['faktory']

            queue_count = (
                server_info['sets']['dead']
                + server_info['sets']['retries']
                + server_info['sets']['scheduled']
                + server_info['sets']['working']
            )
            if 'arborescent' in server_info['queues']:
                queue_count += server_info['queues']['arborescent']
            stencil_count = stencil_col.count_documents(
                _open_stencil_filter(stencil_cfg.current_completed_tcn + 1)
            )
            if queue_count == 0 and stencil_count == 0:
                stencil_cfg.current_completed_tcn += 1
                stencil_col.update_one(
                    {'_id': 'config'},
                    {'$set': {'current_completed_tcn': stencil_cfg.current_completed_tcn}},
                )
                producer = Producer(client=client)
                for stencildb in stencil_col.find(
                    _open_stencil_filter(stencil_cfg.current_completed_tcn + 1)
                ):
                    stencil = from_dict(data_class=odm.StencilDB, data=stencildb)
                    for job in _process_stencil(stencil_col, arbor_col, stencil):
                        producer.push(job)

            time.sleep(3)