Skip to content

Worker Mode

Description

This unit describes the functionality for the system's worker mode. When running we expect \(n\) instances of the worker to run simultaneously.

Public Interfaces

Faktory Job

The Faktory worker interface serves as the primary entry point for worker mode. The interface retrieves and processes tangle generation jobs distributed by Faktory.

State Machine
stateDiagram-v2
    state "Await job" as rj
    state "Create instance of worker class" as grp
    state "Process job" as gsp
    state error <<choice>>

    [*]--> rj
    rj --> grp
    grp --> gsp
    gsp --> error
    error --> rj : Else
    error --> [*]: Error

Private interface

Class Worker

The worker class describes the data and methods needed for an atomic generation job.

Process Job

The process job method contains the core logic of the worker class. The method connects to the MongoDB tangle collection and retrieves the pages pointed to by the job data. The method then calls the low-level grafting code to generate the new tangles from the pages.

State Machine
stateDiagram-v2
    state "Retrieve job" as rj
    state "Get rootstock page" as grp
    state "Get scion page" as gsp

    state "Graft pages" as gp
    state "Store new tangles" as snt
    state "Report job results" as rjr

    [*]--> rj
    rj --> grp
    grp --> gsp
    gsp --> gp
    gp --> snt
    snt --> rjr
    rjr --> [*]

Unit test description

Process Job

Positive Tests

Valid job

A valid job and collection is configured. Job processing is started. Correct output tangles are written.

Inputs:

  • Mocked MongoDB collection
  • Page size set to two

Expected Output:

  • Tangles correctly generated
  • Each tangle is seen in a page.

Negative tests

Invalid collection

An invalid collection is configured. Inputs:

  • Mocked invalid MongoDB collection

Expected Output:

  • Exception is thrown

Invalid Index

A page index is found in the job but not found in the collection.

Inputs:

  • Mocked valid MongoDB collection
  • Invalid index

Expected Output:

  • Exception is thrown

Implementation

runner.fworker.fworker

The faktory worker core functionality.

Worker

Worker(arbor_col, rt_idx, rt_tcn, sci_idx, sci_tcn, page_size)

Class defines an atomic worker.

Attributes:

Name Type Description
_arbor_col Collection

Mongodb collection of arborescent tangles.

_rt_idx str

ID of the start of the rootstock page.

_rt_tcn int

TCN of the rootstocks.

_sci_idx str

ID of the start of the scion page.

_sci_tcn int

TCN of the scions.

_tang_list dict[str, dict]

Dictionary of the generated tangles and attributes.

_notes set[str]

Set of generated tangles, used for generating non-good trees.

Init the worker.

Parameters:

Name Type Description Default
arbor_col Collection

The arborescent tangle collection.

required
rt_idx str

ID of the start of the rootstock page.

required
rt_tcn int

TCN of the rootstocks.

required
sci_idx str

ID of the start of the scion page.

required
sci_tcn int

TCN of the scions.

required
page_size int

The length of a page for the job.

required
Source code in runner/fworker/fworker.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(
    self,
    arbor_col: Collection,
    rt_idx: str,
    rt_tcn: int,
    sci_idx: str,
    sci_tcn: int,
    page_size: int,
) -> None:
    """Init the worker.

    Args:
        arbor_col: The arborescent tangle collection.
        rt_idx: ID of the start of the rootstock page.
        rt_tcn: TCN of the rootstocks.
        sci_idx: ID of the start of the scion page.
        sci_tcn: TCN of the scions.
        page_size: The length of a page for the job.
    """
    self._arbor_col = arbor_col
    self._rt_idx = rt_idx
    self._rt_tcn = rt_tcn
    self._sci_idx = sci_idx
    self._sci_tcn = sci_tcn
    self._tang_list = {}
    self._page_size = page_size
    self._notes = set()

_arbor_col instance-attribute

_arbor_col = arbor_col

_rt_idx instance-attribute

_rt_idx = rt_idx

_rt_tcn instance-attribute

_rt_tcn = rt_tcn

_sci_idx instance-attribute

_sci_idx = sci_idx

_sci_tcn instance-attribute

_sci_tcn = sci_tcn

_tang_list instance-attribute

_tang_list = {}

_page_size instance-attribute

_page_size = page_size

_notes instance-attribute

_notes = set()

_batch_write

_batch_write()

Batch write the generated data to the Mongodb collection.

Source code in runner/fworker/fworker.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def _batch_write(self):
    """Batch write the generated data to the Mongodb collection."""
    if self._tang_list and self._arbor_col is not None:
        writes = []
        for item in self._tang_list:
            writes.append(
                UpdateOne(
                    {'notation': item},
                    {'$setOnInsert': self._tang_list[item]},
                    upsert=True,
                )
            )
        self._arbor_col.bulk_write(writes, ordered=False)
        self._tang_list = {}

process

process()

Process the job.

Source code in runner/fworker/fworker.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def process(self) -> None:
    """Process the job."""

    # Start internal function def ############################################################
    def _write_callback(key: str, index: str, value: str) -> None:
        """Write callback function for cython functionality.

        Args:
            key: The notation for the tree.
            index: The name of the attribute to store.
            value: The value of the attribute.
        """
        # Add notation to notes list for un-goodifying
        if key not in self._notes:
            self._notes.add(key)

        if key not in self._tang_list:
            self._tang_list[key] = {
                'notation': key,
                'TCN': self._rt_tcn + self._sci_tcn,
            }

        if index == 'is_good':
            if value == 'true':
                self._tang_list[key][index] = True
            else:
                self._tang_list[key][index] = False
        elif index == 'positivity':
            self._tang_list[key][index] = value

    # End internal function def ##############################################################

    # Get list of rootstocks
    rootstocks = list(
        self._arbor_col.find(
            {'TCN': self._rt_tcn, '_id': {'$gte': ObjectId(self._rt_idx)}},
            projection={'notation': 1, 'positivity': 1},
            sort={'_id': 1},
        ).limit(self._page_size)
    )

    # Process rootstocks into positivity equivalence. This reduces unneeded computation.
    rs_by_positivity = {'positive': [], 'negative': [], 'neutral': []}
    for rootstock in rootstocks:
        rs_by_positivity[rootstock['positivity']].append(rootstock['notation'])

    # Get list of good scions
    scions = list(
        self._arbor_col.find(
            {
                'TCN': self._sci_tcn,
                'is_good': True,
                '_id': {'$gte': ObjectId(self._sci_idx)},
            },
            projection={'notation': 1, 'positivity': 1},
            sort={'_id': 1},
        ).limit(self._page_size)
    )

    # Process scions into positivity equivalence. This reduces unneeded computation.
    sci_by_positivity = {'positive': [], 'negative': [], 'neutral': []}
    for scion in scions:
        sci_by_positivity[scion['positivity']].append(scion['notation'])

    # Process rootstocks and scions into combinations that can generate canonical trees.
    for job in [
        (rs_by_positivity['positive'], sci_by_positivity['positive']),
        (rs_by_positivity['positive'], sci_by_positivity['neutral']),
        (rs_by_positivity['negative'], sci_by_positivity['negative']),
        (rs_by_positivity['negative'], sci_by_positivity['neutral']),
        (rs_by_positivity['neutral'], sci_by_positivity['positive']),
        (rs_by_positivity['neutral'], sci_by_positivity['negative']),
        (rs_by_positivity['neutral'], sci_by_positivity['neutral']),
    ]:
        if job[0] and job[1]:
            if lib_wrapper.run(job[0], job[1], _write_callback):
                self._batch_write()
            else:
                raise NameError(
                    'Generation went wrong'
                ) from None  # @@@IMPROVEMENT: needs to be updated to exception object

    # un-goodifying everything we generated
    if lib_wrapper.run(['i[0]'], list(self._notes), _write_callback):
        self._batch_write()
    else:
        raise NameError(
            'Generation Went Wrong'
        ) from None  # @@@IMPROVEMENT: needs to be updated to exception object
    self._batch_write()

faktory_job

faktory_job(rt_idx, rt_tcn, sci_idx, sci_tcn, page_size)

Pyfaktory worker callback for generating RLITT.

Parameters:

Name Type Description Default
rt_idx str

ID of the start of the rootstock page.

required
rt_tcn int

TCN of the rootstocks.

required
sci_idx str

ID of the start of the scion page.

required
sci_tcn int

TCN of the scions.

required
page_size int

The size of the page of tangles to retrieve.

required
Source code in runner/fworker/fworker.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def faktory_job(rt_idx: str, rt_tcn: int, sci_idx: str, sci_tcn: int, page_size: int):
    """Pyfaktory worker callback for generating RLITT.

    Args:
        rt_idx: ID of the start of the rootstock page.
        rt_tcn: TCN of the rootstocks.
        sci_idx: ID of the start of the scion page.
        sci_tcn: TCN of the scions.
        page_size: The size of the page of tangles to retrieve.
    """
    db_cfg = cfg.cfg_dict['db-connection-info']
    dbc = odm.get_db(
        db_cfg['domain'],
        db_cfg['port'],
        db_cfg['user'],
        db_cfg['password'],
        db_cfg['database'],
    )
    job = Worker(odm.get_arborescent_collection(dbc), rt_idx, rt_tcn, sci_idx, sci_tcn, page_size)
    job.process()