Control flow

The exchange rate pipeline with 04_install project had a simple linear workflow. handoff can implement more complex workflow logic with foreach and fork.

foreach

Sometimes, you may want to repeat the same task for different entities like accounts. Once you produce a list of the entity keys such as account ID, handoff can repeat tasks in a for-each loop.

05_foreach project shows the structure of foreach control flow:

> cat 05_foreach/project.yml
tasks:
- name: generate_files
  pipeline:
  - command: cat
    args: files/in.txt
  - foreach:
    - name: make_id_files
      pipeline:
      - command: touch
        args: "artifacts/out_{{ _line }}.txt"

- name: verify_result
  pipeline:
  - command: ls
    args: "artifacts/out_*.txt"
  - command: wc
    args: -l

The first task generate_files writes out a list of integers (1, 2, 3, 4, 5). Then the second command foreach receives each ID stored in _line variable. Under the foreach, a sub-task make_id_files is defined to carry out the task with the input.

The second task, called verify_result, verifies the number of output files generated by make_id_files.

Now let’s run:

> handoff run local -p 05_foreach -w workspace_05
[2020-12-28 22:02:28,868] [ WARNING] - 05_foreach/.secrets/secrets.yml does not exsist - (admin.py:223)
[2020-12-28 22:02:28,988] [    INFO] - Found credentials in shared credentials file: ~/.aws/credentials - (credentials.py:1182)
[2020-12-28 22:02:29,338] [ WARNING] - Environment variable HO_BUCKET is not set. Remote file read/write will fail. - (admin.py:132)
[2020-12-28 22:02:29,342] [    INFO] - Job started at 2020-12-28 22:02:29.342212 - (__init__.py:178)
[2020-12-28 22:02:29,342] [    INFO] - Running pipeline generate_files - (operators.py:193)
[2020-12-28 22:02:29,345] [    INFO] - Running foreach loop - (operators.py:208)
[2020-12-28 22:02:29,346] [    INFO] - Running pipeline make_id_files_1 - (operators.py:193)
[2020-12-28 22:02:29,351] [    INFO] - Checking return code of pid 3072 - (operators.py:262)
[2020-12-28 22:02:29,353] [    INFO] - Running pipeline make_id_files_2 - (operators.py:193)
[2020-12-28 22:02:29,358] [    INFO] - Checking return code of pid 3076 - (operators.py:262)
.
.
.
[2020-12-28 22:02:29,372] [    INFO] - Running pipeline make_id_files_5 - (operators.py:193)
[2020-12-28 22:02:29,378] [    INFO] - Checking return code of pid 3088 - (operators.py:262)
[2020-12-28 22:02:29,381] [    INFO] - Checking return code of pid 3070 - (operators.py:262)
[2020-12-28 22:02:29,381] [    INFO] - Pipeline generate_files exited with code 0 - (task.py:32)
[2020-12-28 22:02:29,381] [    INFO] - Running pipeline verify_result - (operators.py:193)
[2020-12-28 22:02:29,390] [    INFO] - Checking return code of pid 3094 - (operators.py:262)
[2020-12-28 22:02:29,390] [    INFO] - Checking return code of pid 3096 - (operators.py:262)
[2020-12-28 22:02:29,391] [    INFO] - Pipeline verify_result exited with code 0 - (task.py:32)
[2020-12-28 22:02:29,391] [    INFO] - Job ended at 2020-12-28 22:02:29.391370 - (__init__.py:184)
[2020-12-28 22:02:29,391] [    INFO] - Processed in 0:00:00.049158 - (__init__.py:186)

You can verify that 5 output files are created:

-rw-rw-r-- 1 ubuntu ubuntu 0 Dec 28 22:02 workspace_05/artifacts/out_1.txt
-rw-rw-r-- 1 ubuntu ubuntu 0 Dec 28 22:02 workspace_05/artifacts/out_2.txt
-rw-rw-r-- 1 ubuntu ubuntu 0 Dec 28 22:02 workspace_05/artifacts/out_3.txt
-rw-rw-r-- 1 ubuntu ubuntu 0 Dec 28 22:02 workspace_05/artifacts/out_4.txt
-rw-rw-r-- 1 ubuntu ubuntu 0 Dec 28 22:02 workspace_05/artifacts/out_5.txt

fork

fork command let you fork the pipline. 06_fork project shows an example of using fork as an extension to our previous exchange rates project 04_install.

project.yml looks like:

> cat 06_fork/project.yml
version: 0.3
description: Fetch foreign exchange rates

installs:
- venv: tap
  command: pip install tap-exchangeratesapi
- venv: target
  command: pip install target-csv

vars:
- key: base_currency
  value: USD

tasks:
- name: fetch_exchange_rates
  description: Fetch exchange rates
  pipeline:
  - command: tap-exchangeratesapi
    args: --config files/tap-config.json
    venv: tap
  - fork:
    - name: wide-format
      pipeline:
      - command: target-csv
        args: --config files/target-config.json
        venv: target
    - name: long-format
      pipeline:
      - command: python3
        args: files/convert_to_long_format.py
      - command: target-csv
        args: --config files/target-config.json
        venv: target

deploy:
  cloud_provider: aws
  cloud_platform: fargate
  resource_group: handoff-etl
  container_image: xxxxxxxxv
  task: exchange-rates

schedule:
  target_id: 1
  cron: "0 0 * * ? *"
  envs: []

The difference from 04_install is that there are two target-csv receiving the stdout by tap-exchangerates. The first task, wide-format, dumps CSV just like 04_install project. The output wide-format table has all the currencies listed in the columns.

The second task, long-format, dumps CSV by converting to a long format. The output has date, symbol, and rates columns:

Let’s verify. First install workpace:

> handoff workspace install -p 06_fork -w workspace_06
[2020-12-28 22:02:29,961] [    INFO] - Found credentials in shared credentials file: ~/.aws/credentials - (credentials.py:1182)
Requirement already satisfied: wheel in ./tap/lib/python3.6/site-packages (0.36.2)
Collecting tap-exchangeratesapi
  Using cached tap_exchangeratesapi-0.1.1-cp36-none-any.whl
Collecting backoff==1.3.2
  Using cached backoff-1.3.2-cp36-none-any.whl
Collecting requests==2.21.0
  Using cached requests-2.21.0-py2.py3-none-any.whl (57 kB)
Collecting singer-python==5.3.3
  Using cached singer_python-5.3.3-cp36-none-any.whl
.
.
.
  Using cached six-1.15.0-py2.py3-none-any.whl (10 kB)
Collecting pytzdata
  Using cached pytzdata-2020.1-py2.py3-none-any.whl (489 kB)
Collecting tzlocal
  Using cached tzlocal-2.1-py2.py3-none-any.whl (16 kB)
Collecting pytz
  Using cached pytz-2020.5-py2.py3-none-any.whl (510 kB)
Installing collected packages: six, pytz, tzlocal, pytzdata, python-dateutil, simplejson, pendulum, singer-python, jsonschema, target-csv
Successfully installed jsonschema-2.6.0 pendulum-1.2.0 python-dateutil-2.8.1 pytz-2020.5 pytzdata-2020.1 simplejson-3.11.1 singer-python-2.1.4 six-1.15.0 target-csv-0.3.0 tzlocal-2.1
sucess

Now let’s run:

> handoff run local -p 06_fork -w workspace_06 -v start_date=$(date -I -d "-7 day")
[2020-12-28 22:02:44,338] [ WARNING] - 06_fork/.secrets/secrets.yml does not exsist - (admin.py:223)
[2020-12-28 22:02:44,460] [    INFO] - Found credentials in shared credentials file: ~/.aws/credentials - (credentials.py:1182)
[2020-12-28 22:02:44,812] [    INFO] - Job started at 2020-12-28 22:02:44.812863 - (__init__.py:178)
[2020-12-28 22:02:44,813] [    INFO] - Running pipeline fetch_exchange_rates - (operators.py:193)
[2020-12-28 22:02:44,815] [    INFO] - Forking the downstream... - (operators.py:216)
[2020-12-28 22:02:44,816] [    INFO] - Running pipeline wide-format - (operators.py:193)
[2020-12-28 22:02:44,822] [    INFO] - Running pipeline long-format - (operators.py:193)
[2020-12-28 22:02:45,625] [    INFO] - Checking return code of pid 3187 - (operators.py:262)
[2020-12-28 22:02:45,668] [    INFO] - Checking return code of pid 3191 - (operators.py:262)
[2020-12-28 22:02:45,668] [    INFO] - Checking return code of pid 3193 - (operators.py:262)
.
.
.
[2020-12-28 22:02:45,686] [    INFO] - Checking return code of pid 3186 - (operators.py:262)
[2020-12-28 22:02:45,687] [    INFO] - Pipeline fetch_exchange_rates exited with code 0 - (task.py:32)
[2020-12-28 22:02:45,687] [    INFO] - Job ended at 2020-12-28 22:02:45.687229 - (__init__.py:184)
[2020-12-28 22:02:45,687] [    INFO] - Processed in 0:00:00.874366 - (__init__.py:186)

The long format CSV file looks like this:

date,symbol,rate
2020-12-21T00:00:00Z,CAD,1.2885895014
2020-12-21T00:00:00Z,HKD,7.7530600509

Now that we know how to develop and run the pipeline locally, we will gradually start thinking about how to deploy this in the cloud severlessly. We will learn how to save and fetch the configurations to the remote storage. Before doing that, we will cover how to set up AWS account and profile in the next section.