Control flow

The exchange rate pipeline with 04_install project had a simple linear workflow. handoff can implement more complex workflow logic with foreach and fork.

foreach

Sometimes, you may want to repeat the same task for different entities like accounts. Once you produce a list of the entity keys such as account ID, handoff can repeat tasks in a for-each loop.

05_foreach project shows the structure of foreach control flow:

> cat 05_foreach/project.yml
tasks:
- name: generate_files
  pipeline:
  - command: cat
    args: files/in.txt
  - foreach:
    - name: make_id_files
      pipeline:
      - command: touch
        args: "artifacts/out_{{ _line }}.txt"

- name: verify_result
  pipeline:
  - command: ls
    args: "artifacts/out_*.txt"
  - command: wc
    args: -l

The first task generate_files writes out a list of integers (1, 2, 3, 4, 5). Then the second command foreach receives each ID stored in _line variable. Under the foreach, a sub-task make_id_files is defined to carry out the task with the input.

The second task, called verify_result, verifies the number of output files generated by make_id_files.

Now let’s run:

> handoff run local -p 05_foreach -w workspace_05
[2021-04-07 05:16:27,467] [ WARNING] - 05_foreach/.secrets/secrets.yml does not exsist - (admin.py:223)
[2021-04-07 05:16:27,598] [    INFO] - Found credentials in shared credentials file: ~/.aws/credentials - (credentials.py:1223)
[2021-04-07 05:16:27,957] [ WARNING] - Environment variable HO_BUCKET is not set. Remote file read/write will fail. - (admin.py:132)
[2021-04-07 05:16:27,961] [    INFO] - Job started at 2021-04-07 05:16:27.961542 - (__init__.py:178)
[2021-04-07 05:16:27,961] [    INFO] - Running pipeline generate_files - (operators.py:194)
[2021-04-07 05:16:27,965] [    INFO] - Running foreach loop - (operators.py:209)
[2021-04-07 05:16:27,967] [    INFO] - Running pipeline make_id_files_1 - (operators.py:194)
[2021-04-07 05:16:27,973] [    INFO] - Checking return code of pid 5063 - (operators.py:263)
[2021-04-07 05:16:27,975] [    INFO] - Running pipeline make_id_files_2 - (operators.py:194)
[2021-04-07 05:16:27,982] [    INFO] - Checking return code of pid 5067 - (operators.py:263)
.
.
.
[2021-04-07 05:16:28,000] [    INFO] - Running pipeline make_id_files_5 - (operators.py:194)
[2021-04-07 05:16:28,007] [    INFO] - Checking return code of pid 5079 - (operators.py:263)
[2021-04-07 05:16:28,012] [    INFO] - Checking return code of pid 5061 - (operators.py:263)
[2021-04-07 05:16:28,012] [    INFO] - Pipeline generate_files exited with code 0 - (task.py:32)
[2021-04-07 05:16:28,012] [    INFO] - Running pipeline verify_result - (operators.py:194)
[2021-04-07 05:16:28,024] [    INFO] - Checking return code of pid 5085 - (operators.py:263)
[2021-04-07 05:16:28,024] [    INFO] - Checking return code of pid 5087 - (operators.py:263)
[2021-04-07 05:16:28,025] [    INFO] - Pipeline verify_result exited with code 0 - (task.py:32)
[2021-04-07 05:16:28,025] [    INFO] - Job ended at 2021-04-07 05:16:28.025819 - (__init__.py:189)
[2021-04-07 05:16:28,025] [    INFO] - Processed in 0:00:00.064277 - (__init__.py:191)

You can verify that 5 output files are created:

-rw-rw-r-- 1 ubuntu ubuntu 0 Apr  7 05:16 workspace_05/artifacts/out_1.txt
-rw-rw-r-- 1 ubuntu ubuntu 0 Apr  7 05:16 workspace_05/artifacts/out_2.txt
-rw-rw-r-- 1 ubuntu ubuntu 0 Apr  7 05:16 workspace_05/artifacts/out_3.txt
-rw-rw-r-- 1 ubuntu ubuntu 0 Apr  7 05:16 workspace_05/artifacts/out_4.txt
-rw-rw-r-- 1 ubuntu ubuntu 0 Apr  7 05:16 workspace_05/artifacts/out_5.txt

fork

fork command let you fork the pipline. 06_fork project shows an example of using fork as an extension to our previous exchange rates project 04_install.

project.yml looks like:

> cat 06_fork/project.yml
version: 0.3
description: Fetch foreign exchange rates

installs:
- venv: tap
  command: pip install tap-exchangeratehost
- venv: target
  command: pip install target-csv

vars:
- key: base_currency
  value: USD

tasks:
- name: fetch_exchange_rates
  description: Fetch exchange rates
  pipeline:
  - command: tap-exchangeratehost
    args: --config files/tap-config.json
    venv: tap
  - fork:
    - name: wide-format
      pipeline:
      - command: target-csv
        args: --config files/target-config.json
        venv: target
    - name: long-format
      pipeline:
      - command: python3
        args: files/convert_to_long_format.py
      - command: target-csv
        args: --config files/target-config.json
        venv: target

deploy:
  cloud_provider: aws
  cloud_platform: fargate
  resource_group: handoff-etl
  container_image: xxxxxxxxcsv
  task: exchange-rates

schedule:
  target_id: 1
  cron: "0 0 * * ? *"
  envs: []

The difference from 04_install is that there are two target-csv receiving the stdout by tap-exchangerates. The first task, wide-format, dumps CSV just like 04_install project. The output wide-format table has all the currencies listed in the columns.

The second task, long-format, dumps CSV by converting to a long format. The output has date, symbol, and rates columns:

Let’s verify. First install workpace:

> handoff workspace install -p 06_fork -w workspace_06
[2021-04-07 05:16:28,607] [    INFO] - Found credentials in shared credentials file: ~/.aws/credentials - (credentials.py:1223)
Requirement already satisfied: wheel in ./tap/lib/python3.6/site-packages (0.36.2)
Collecting tap-exchangeratehost
  Using cached tap_exchangeratehost-0.1.0-py3-none-any.whl (8.2 kB)
Collecting requests>=2.23.0
  Using cached requests-2.25.1-py2.py3-none-any.whl (61 kB)
Collecting singer-python>=5.3.0
  Using cached singer_python-5.12.1-py3-none-any.whl
Collecting urllib3<1.27,>=1.21.1
  Using cached urllib3-1.26.4-py2.py3-none-any.whl (153 kB)
.
.
.
  Using cached pytzdata-2020.1-py2.py3-none-any.whl (489 kB)
Collecting python-dateutil
  Using cached python_dateutil-2.8.1-py2.py3-none-any.whl (227 kB)
Collecting six>=1.5
  Using cached six-1.15.0-py2.py3-none-any.whl (10 kB)
Collecting pytz
  Using cached pytz-2021.1-py2.py3-none-any.whl (510 kB)
Installing collected packages: six, pytz, tzlocal, pytzdata, python-dateutil, simplejson, pendulum, singer-python, jsonschema, target-csv
Successfully installed jsonschema-2.6.0 pendulum-1.2.0 python-dateutil-2.8.1 pytz-2021.1 pytzdata-2020.1 simplejson-3.11.1 singer-python-2.1.4 six-1.15.0 target-csv-0.3.0 tzlocal-2.1
sucess

Now let’s run:

> handoff run local -p 06_fork -w workspace_06 -v start_date=$(date -I -d "-7 day")
[2021-04-07 05:16:43,829] [ WARNING] - 06_fork/.secrets/secrets.yml does not exsist - (admin.py:223)
[2021-04-07 05:16:43,961] [    INFO] - Found credentials in shared credentials file: ~/.aws/credentials - (credentials.py:1223)
[2021-04-07 05:16:44,328] [    INFO] - Job started at 2021-04-07 05:16:44.327973 - (__init__.py:178)
[2021-04-07 05:16:44,328] [    INFO] - Running pipeline fetch_exchange_rates - (operators.py:194)
[2021-04-07 05:16:44,332] [    INFO] - Forking the downstream... - (operators.py:217)
[2021-04-07 05:16:44,332] [    INFO] - Running pipeline wide-format - (operators.py:194)
[2021-04-07 05:16:44,342] [    INFO] - Running pipeline long-format - (operators.py:194)
[2021-04-07 05:16:45,666] [    INFO] - Checking return code of pid 5178 - (operators.py:263)
[2021-04-07 05:16:45,707] [    INFO] - Checking return code of pid 5182 - (operators.py:263)
[2021-04-07 05:16:45,795] [    INFO] - Checking return code of pid 5184 - (operators.py:263)
.
.
.
[2021-04-07 05:16:45,870] [    INFO] - Checking return code of pid 5177 - (operators.py:263)
[2021-04-07 05:16:45,870] [    INFO] - Pipeline fetch_exchange_rates exited with code 0 - (task.py:32)
[2021-04-07 05:16:45,870] [    INFO] - Job ended at 2021-04-07 05:16:45.870886 - (__init__.py:189)
[2021-04-07 05:16:45,870] [    INFO] - Processed in 0:00:01.542913 - (__init__.py:191)

The long format CSV file looks like this:

date,symbol,rate
2021-03-31T00:00:00Z,AED,3.6732
2021-03-31T00:00:00Z,AFN,77.250007

Now that we know how to develop and run the pipeline locally, we will gradually start thinking about how to deploy this in the cloud severlessly. We will learn how to save and fetch the configurations to the remote storage. Before doing that, we will cover how to set up AWS account and profile in the next section.