Urd Example - Part 2: Let Urd Keep Track of Datasets
[Last updated 2021-10-12]
Urd is the Norse goddess of the past. It is also the name of the Accelerator’s database that keeps track of built jobs and their dependencies.
In this post, we show how to import a list of files and create a dataset chain and how to store and retrieve the constructed dataset chain using Urd.
The presented solution will automatically handle the case when new files are added to a project. Re-running the build script will import and chain all files that are new since the last run. References in Urd will be updated as well, and previous references will still be available for reproducibility. Data is never lost or overwritten using the Accelerator.
This is the second part of a series of three posts. The other posts are Part 1: Plain Import of a List of Files and Part 3: Appending New Columns to an Existing Dataset Chain.
Background
Urd keeps track of jobs and their dependencies on other jobs. Two key Urd concepts are the session and the list.
-
An Urd session is a set of Accelerator jobs that are tied together by a timestamp. A session can contain references to other sessions.
-
An Urd list is a list of sessions that are stored in increasing timestamp order and shares a commmon name (or “key”).
Any session can be looked up if we know the timestamp and the name
of the list. For example, when importing a list of files, each import
is stored in its own session using a unique timestamp, and all
sessions are located in one list, for example named import
.
It should be noted that Urd timetamps can be either an integer, a timestamp with resolution ranging from months up to micro seconds, or a tuple containing both. This makes indexing of sessions very flexible.
A Build Script for Importing Files and Storing References in Urd
We want to import these files in date order and create a dataset chain.
files = (
('2009-01', 'yellow_tripdata_2009-01.csv'),
('2009-02', 'yellow_tripdata_2009-02.csv'),
('2009-03', 'yellow_tripdata_2009-03.csv'),
)
The difference from the previous post is that here, each file has a timestamp. Urd will associate each timestamp with a session of jobs used for that particular import. Thus, we can ask Urd to return a session containing the state of any import or data processing operation, the way it looked at any particular instance in time! This is extremely powerful!
Here is a complete build script that will import and chain the files in increasing time order, as well as storing sessions with references to all jobs together with their timestamp in Urd.
1
2
3
4
5
6
7
8
9
def main(urd):
key = 'import'
last_import = urd.peek_latest(key).timestamp
for ts, filename in files:
if ts > last_import:
urd.begin(key, ts)
previous = urd.latest(key).joblist.get(-1)
urd.build('csvimport', filename=filename, previous=previous)
urd.finish(key)
Detailed explanation:
-
We start with line 8. This is the line that does the actual importing of the file. The method needs to know the name of the file to import, and also, since we want the datasets to be chained, we provide a
previous
that points to the previous import. -
Then, lines 6 and 9 defines an Urd session for this import. The session is associated with a key and a timestamp. The key binds the import of all the files together to an Urd list, and the timestamp is used to identify individual jobs.
-
Line 7 finds the reference to the previous import job. This is done by asking Urd for the latest job that is associated with the given key. If there is no such job,
previous
will be set toNone
. (See below for explanation of theget(-1)
.)Note that
urd.latest
instructs Urd “under the hood” to attach this dependency to the Urd session. Therefore, it can only be issued inside a running Urd session.
This concludes the actual Urd session and importing, in addition
-
line 3, early in the script, we ask Urd for the latest import so far. This will be our starting point. (We cannot use
urd.latest
since we are not recording an Urd session. Instead we useurd.peek_latest
, which is the same thing, but without dependency recording.) -
lines 4 and 5 loop over all files and timestamps, and issues an Urd session with import only for those files that it has not imported on a previous run.
The urd.joblist.get(-1)
might seem cryptic. Actually the joblist
is a kind of a list, that has an additional class method get()
that
works kind of like dict.get()
. The argument could be either a
method name, or an index. Index -1
corresponds to the last index in
the list. If this does not exist (i.e. the joblist
is empty),
get
will return None
.
Note that the files are required to appear in increasing timestamp order. If they are not, we need to make a desicion on how to handle the situation. This is out of scope of this post.
Running the build script
As usual, a build script named build_<name>
is run using ax run <name>
,
so if the script is named build_import.py
, we do
% ax run import
Inspecting the Urd Database
Now, let us investigate what has been stored in the Urd database. We
use the ax urd
-command to talk directly to the Urd server.
First, let us list all the Urd lists.
% ax urd
<user>/import
There is only one urd list, named import
, recorded. Let us list all
its timestamps:
% ax urd import/since/0
2009-01
2009-02
2009-03
(We asked for all timestamps are larger than ‘0’. All valid timestamps sort after zero. Replace ‘0’ by an actual timestamp to see only more recent entries.)
We can look at a single entry like this
% ax urd import/2009-01
timestamp: 2020-01
caption :
deps :
JobList(
[ 0] csvimport : dev-0
)
What is returned here in pretty printed JSON is basically the Urd
session object that is returned by urd.latest()
and its siblings.
(If we want to see the latest entry only there is a shortcut ax urd <list>
.)
How to Fetch the Imported Dataset Chain from Another Build Script
Now that the import jobs are stored in an Urd list, we can retrieve them in another build script and do some data processing
def main(urd):
impjob = urd.peek_latest('import').joblist[-1]
# "imp" is now a jobid to the latest import job, use it for example like this
urd.build('process', source=imp)
Here we access the last item in the joblist
directly, since we want
execution to fail immediately if there is no job to be found. (Look
at the JSON-version of the Urd session object above. What we get from
peek_latest
is an Urd session object, that has an attribute
joblist
, which contains jobids to all jobs in the session. There is
only one job in the list, so we fetch the last and only job from the
session.)
Note again that we use peek_latest()
which is the non-recording
version of latest()
for use outside of Urd sessions.
Fetch Imported Dataset and Create a new Processing Session
This is a common pattern. We fetch a particular version (timestamp) of the jobs in an Urd list, and process the data stored there. The processing we do is also stored in Urd in another list, but with the same timestamp:
def main(urd):
key = 'process'
urd.begin(key)
urditem = urd.latest('import')
ts = urditem.timestamp
impjob = urditem.joblist[-1]
urd.build('process', source=impjob)
urd.finish(key, ts)
Note here how the timestamp for the import
session is extracted and
inserted into the process
session. Furthermore, using
urd.latest()
inside the session will make sure that the dependency
from the import
session will be stored in the process
session.
To see this dependency, we run the build script above and then we take a look at Urd.
% ax urd
<user>/import
<user>/process
A new Urd-list process
has been created. What timestamps does it contain?
% ax urd process/since/0
2009-03
Same timestamp as the last import job, from where we fetched the dataset chain we used as input. The Urd session:
% ax urd process/2009-03
timestamp: 2009-03
caption :
deps : <user>/import/2009-03
JobList(
[ 0] process : dev-3
)
Here we can see the jobid to the process
job, which is dev-3
, as
well as its dependencies. In this case, the dependency is the
import
Urd list at timestamp 2009-03
, holding the csvimport
job
dev-2
.
Using the Accelerator, every compute step is traceable all the way back to its source code and input data!
In the next part, we’ll show how to use Urd to append new columns to an existing dataset chain.
Additional Resources
The Accelerator’s Homepage (exax.org)
The Accelerator on Github/exaxorg
The Accelerator on PyPI
Reference Manual