Urd Example - Part 2: Let Urd Keep Track of Datasets

Urd is the Norse goddess of the past. It is also the name of the Accelerator’s database that keeps track of built jobs and their dependencies.

In this post, we show how to import a list of files and create a dataset chain and how to store and retrieve the constructed dataset chain using Urd.

The presented solution will automatically handle the case when new files are added to a project. Re-running the build script will import and chain all files that are new since the last run. References in Urd will be updated as well, and previous references will still be available for reproducibility. Data is never lost or overwritten using the Accelerator.

This is the second part of a series of three posts. The other posts are Part 1: Plain Import of a List of Files and Part 3: Appending New Columns to an Existing Dataset Chain.

Background

Urd keeps track of jobs and their dependencies on other jobs. Two key Urd concepts are the session and the list.

Any session can be looked up if we know the timestamp and the name of the list. For example, when importing a list of files, each import is stored in its own session using a unique timestamp, and all sessions are located in one list, for example named import.

It should be noted that Urd timetamps can be either an integer, a timestamp with resolution ranging from months up to micro seconds, or a tuple containing both. This makes indexing of sessions very flexible.

A Build Script for Importing Files and Storing References in Urd

We want to import these files in date order and create a dataset chain.

files = (
    ('2009-01', 'yellow_tripdata_2009-01.csv'),
    ('2009-02', 'yellow_tripdata_2009-02.csv'),
    ('2009-03', 'yellow_tripdata_2009-03.csv'),
)

The difference from the previous post is that here, each file has a timestamp. Urd will associate each timestamp with a session of jobs used for that particular import. Thus, we can ask Urd to return a session containing the state of any import or data processing operation, the way it looked at any particular instance in time! This is extremely powerful!

Here is a complete build script that will import and chain the files in increasing time order, as well as storing sessions with references to all jobs together with their timestamp in Urd.

1
2
3
4
5
6
7
8
9
def main(urd):
    key = 'import'
    last_import = urd.peek_latest(key).timestamp
    for ts, filename in files:
        if ts > last_import:
            urd.begin(key, ts)
            previous = urd.latest(key).joblist.get(-1)
            urd.build('csvimport', filename=filename, previous=previous)
            urd.finish(key)

Detailed explanation:

This concludes the actual Urd session and importing, in addition

The urd.joblist.get(-1) might seem cryptic. Actually the joblist is a kind of a list, that has an additional class method get() that works kind of like dict.get(). The argument could be either a method name, or an index. Index -1 corresponds to the last index in the list. If this does not exist (i.e. the joblist is empty), get will return None.

Note that the files are required to appear in increasing timestamp order. If they are not, we need to make a desicion on how to handle the situation. This is out of scope of this post.

Running the build script

As usual, a build script named build_<name> is run using ax run <name>, so if the script is named build_import.py, we do

% ax run import

Inspecting the Urd Database

Now, let us investigate what has been stored in the Urd database. We use the ax curl-command to talk directly to the Urd server.

First, let us list all the Urd lists.

% ax curl list
[
    "<user>/import",
]
	

There is only one urd list, named import, recorded. Let us list all its timestamps:

% ax curl <user>/import/since/0
[
    "2009-01",
    "2009-02",
    "2009-03"
]

(We asked for all timestamps are larger than ‘0’. All valid timestamps sort after zero. Replace ‘0’ by an actual timestamp to see only more recent entries.)

We can look at a single entry like this

% ax curl <user>/import/2009-01
{
    "timestamp": "2020-01",
    "user": "<user>",
    "build": "import",
    "joblist": [
        [
	    "csvimport",
	    "dev-0"
	]
    ],
    "deps": {},
    "caption": ""
}

What is returned here in pretty printed JSON is basically the Urd session object that is returned by urd.latest() and its siblings.

(If we want to see the latest entry only there is a shortcut ax curl <list>/latest.)

How to Fetch the Imported Dataset Chain from Another Build Script

Now that the import jobs are stored in an Urd list, we can retrieve them in another build script and do some data processing

def main(urd):
    impjob = urd.peek_latest('import').joblist[-1]
    # "imp" is now a jobid to the latest import job, use it for example like this
    urd.build('process', source=imp)

Here we access the last item in the joblist directly, since we want execution to fail immediately if there is no job to be found. (Look at the JSON-version of the Urd session object above. What we get from peek_latest is an Urd session object, that has an attribute joblist, which contains jobids to all jobs in the session. There is only one job in the list, so we fetch the last and only job from the session.)

Note again that we use peek_latest() which is the non-recording version of latest() for use outside of Urd sessions.

Fetch Imported Dataset and Create a new Processing Session

This is a common pattern. We fetch a particular version (timestamp) of the jobs in an Urd list, and process the data stored there. The processing we do is also stored in Urd in another list, but with the same timestamp:

def main(urd):
    key = 'process'
    urd.begin(key)
    urditem = urd.latest('import')
    ts = urditem.timestamp
    impjob = urditem.joblist[-1]
    urd.build('process', source=impjob)
    urd.finish(key, ts)

Note here how the timestamp for the import session is extracted and inserted into the process session. Furthermore, using urd.latest() inside the session will make sure that the dependency from the import session will be stored in the process session.

To see this dependency, we run the build script above and then we take a look at Urd.

% ax curl list
[
    "<user>/import",
    "<user>/process"
]

A new Urd-list process has been created. What timestamps does it contain?

% ax curl <user>/process/since/0
[
    "2009-03"
]

Same timestamp as the last import job, from where we fetched the dataset chain we used as input. The Urd session:

% ax curl <user>/process/2009-03
{
    "timestamp": "2009-03",
    "build": "process",
    "user": "<user>",
    "joblist": [
        [
            "process",
            "dev-3"
        ]
    ],
    "caption": "",
    "deps": {
        "<user>/import": {
            "caption": "",
            "joblist": [
                [
                    "csvimport",
                    "dev-2"
                ]
            ],
            "timestamp": "2009-03"
        }
    }
}

Here we can see the jobid to the process job, which is dev-3, as well as its dependencies. In this case, the dependency is the import Urd list at timestamp 2009-03, holding the csvimport job dev-2.

Using the Accelerator, every compute step is traceable all the way back to its source code and input data!

In the next part, we’ll show how to use Urd to append new columns to an existing dataset chain.

Additional Resources

The Accelerator’s Homepage (exax.org)
The Accelerator on Github/eBay
The Accelerator on PyPI
Reference Manual