## Control flow The exchange rate pipeline with 04_install project had a simple linear workflow. handoff can implement more complex workflow logic with foreach and fork. ### foreach Sometimes, you may want to repeat the same task for different entities like accounts. Once you produce a list of the entity keys such as account ID, handoff can repeat tasks in a for-each loop. 05_foreach project shows the structure of foreach control flow: ```shell > cat 05_foreach/project.yml ``` ```shell tasks: - name: generate_files pipeline: - command: cat args: files/in.txt - foreach: - name: make_id_files pipeline: - command: touch args: "artifacts/out_{{ _line }}.txt" - name: verify_result pipeline: - command: ls args: "artifacts/out_*.txt" - command: wc args: -l ``` The first task generate_files writes out a list of integers (1, 2, 3, 4, 5). Then the second command `foreach` receives each ID stored in `_line` variable. Under the foreach, a sub-task make_id_files is defined to carry out the task with the input. The second task, called verify_result, verifies the number of output files generated by make_id_files. Now let's run: ```shell > handoff run local -p 05_foreach -w workspace_05 ``` ```shell [2021-04-07 05:16:27,467] [ WARNING] - 05_foreach/.secrets/secrets.yml does not exsist - (admin.py:223) [2021-04-07 05:16:27,598] [ INFO] - Found credentials in shared credentials file: ~/.aws/credentials - (credentials.py:1223) [2021-04-07 05:16:27,957] [ WARNING] - Environment variable HO_BUCKET is not set. Remote file read/write will fail. - (admin.py:132) [2021-04-07 05:16:27,961] [ INFO] - Job started at 2021-04-07 05:16:27.961542 - (__init__.py:178) [2021-04-07 05:16:27,961] [ INFO] - Running pipeline generate_files - (operators.py:194) [2021-04-07 05:16:27,965] [ INFO] - Running foreach loop - (operators.py:209) [2021-04-07 05:16:27,967] [ INFO] - Running pipeline make_id_files_1 - (operators.py:194) [2021-04-07 05:16:27,973] [ INFO] - Checking return code of pid 5063 - (operators.py:263) [2021-04-07 05:16:27,975] [ INFO] - Running pipeline make_id_files_2 - (operators.py:194) [2021-04-07 05:16:27,982] [ INFO] - Checking return code of pid 5067 - (operators.py:263) . . . [2021-04-07 05:16:28,000] [ INFO] - Running pipeline make_id_files_5 - (operators.py:194) [2021-04-07 05:16:28,007] [ INFO] - Checking return code of pid 5079 - (operators.py:263) [2021-04-07 05:16:28,012] [ INFO] - Checking return code of pid 5061 - (operators.py:263) [2021-04-07 05:16:28,012] [ INFO] - Pipeline generate_files exited with code 0 - (task.py:32) [2021-04-07 05:16:28,012] [ INFO] - Running pipeline verify_result - (operators.py:194) [2021-04-07 05:16:28,024] [ INFO] - Checking return code of pid 5085 - (operators.py:263) [2021-04-07 05:16:28,024] [ INFO] - Checking return code of pid 5087 - (operators.py:263) [2021-04-07 05:16:28,025] [ INFO] - Pipeline verify_result exited with code 0 - (task.py:32) [2021-04-07 05:16:28,025] [ INFO] - Job ended at 2021-04-07 05:16:28.025819 - (__init__.py:189) [2021-04-07 05:16:28,025] [ INFO] - Processed in 0:00:00.064277 - (__init__.py:191) ``` You can verify that 5 output files are created: ```shell -rw-rw-r-- 1 ubuntu ubuntu 0 Apr 7 05:16 workspace_05/artifacts/out_1.txt -rw-rw-r-- 1 ubuntu ubuntu 0 Apr 7 05:16 workspace_05/artifacts/out_2.txt -rw-rw-r-- 1 ubuntu ubuntu 0 Apr 7 05:16 workspace_05/artifacts/out_3.txt -rw-rw-r-- 1 ubuntu ubuntu 0 Apr 7 05:16 workspace_05/artifacts/out_4.txt -rw-rw-r-- 1 ubuntu ubuntu 0 Apr 7 05:16 workspace_05/artifacts/out_5.txt ``` ### fork fork command let you fork the pipline. 06_fork project shows an example of using fork as an extension to our previous exchange rates project 04_install. project.yml looks like: ```shell > cat 06_fork/project.yml ``` ```shell version: 0.3 description: Fetch foreign exchange rates installs: - venv: tap command: pip install tap-exchangeratehost - venv: target command: pip install target-csv vars: - key: base_currency value: USD tasks: - name: fetch_exchange_rates description: Fetch exchange rates pipeline: - command: tap-exchangeratehost args: --config files/tap-config.json venv: tap - fork: - name: wide-format pipeline: - command: target-csv args: --config files/target-config.json venv: target - name: long-format pipeline: - command: python3 args: files/convert_to_long_format.py - command: target-csv args: --config files/target-config.json venv: target deploy: cloud_provider: aws cloud_platform: fargate resource_group: handoff-etl container_image: xxxxxxxxcsv task: exchange-rates schedule: target_id: 1 cron: "0 0 * * ? *" envs: [] ``` The difference from 04_install is that there are two target-csv receiving the stdout by tap-exchangerates. The first task, wide-format, dumps CSV just like 04_install project. The output wide-format table has all the currencies listed in the columns. The second task, long-format, dumps CSV by converting to a long format. The output has date, symbol, and rates columns: Let's verify. First install workpace: ```shell > handoff workspace install -p 06_fork -w workspace_06 ``` ```shell [2021-04-07 05:16:28,607] [ INFO] - Found credentials in shared credentials file: ~/.aws/credentials - (credentials.py:1223) Requirement already satisfied: wheel in ./tap/lib/python3.6/site-packages (0.36.2) Collecting tap-exchangeratehost Using cached tap_exchangeratehost-0.1.0-py3-none-any.whl (8.2 kB) Collecting requests>=2.23.0 Using cached requests-2.25.1-py2.py3-none-any.whl (61 kB) Collecting singer-python>=5.3.0 Using cached singer_python-5.12.1-py3-none-any.whl Collecting urllib3<1.27,>=1.21.1 Using cached urllib3-1.26.4-py2.py3-none-any.whl (153 kB) . . . Using cached pytzdata-2020.1-py2.py3-none-any.whl (489 kB) Collecting python-dateutil Using cached python_dateutil-2.8.1-py2.py3-none-any.whl (227 kB) Collecting six>=1.5 Using cached six-1.15.0-py2.py3-none-any.whl (10 kB) Collecting pytz Using cached pytz-2021.1-py2.py3-none-any.whl (510 kB) Installing collected packages: six, pytz, tzlocal, pytzdata, python-dateutil, simplejson, pendulum, singer-python, jsonschema, target-csv Successfully installed jsonschema-2.6.0 pendulum-1.2.0 python-dateutil-2.8.1 pytz-2021.1 pytzdata-2020.1 simplejson-3.11.1 singer-python-2.1.4 six-1.15.0 target-csv-0.3.0 tzlocal-2.1 sucess ``` Now let's run: ```shell > handoff run local -p 06_fork -w workspace_06 -v start_date=$(date -I -d "-7 day") ``` ```shell [2021-04-07 05:16:43,829] [ WARNING] - 06_fork/.secrets/secrets.yml does not exsist - (admin.py:223) [2021-04-07 05:16:43,961] [ INFO] - Found credentials in shared credentials file: ~/.aws/credentials - (credentials.py:1223) [2021-04-07 05:16:44,328] [ INFO] - Job started at 2021-04-07 05:16:44.327973 - (__init__.py:178) [2021-04-07 05:16:44,328] [ INFO] - Running pipeline fetch_exchange_rates - (operators.py:194) [2021-04-07 05:16:44,332] [ INFO] - Forking the downstream... - (operators.py:217) [2021-04-07 05:16:44,332] [ INFO] - Running pipeline wide-format - (operators.py:194) [2021-04-07 05:16:44,342] [ INFO] - Running pipeline long-format - (operators.py:194) [2021-04-07 05:16:45,666] [ INFO] - Checking return code of pid 5178 - (operators.py:263) [2021-04-07 05:16:45,707] [ INFO] - Checking return code of pid 5182 - (operators.py:263) [2021-04-07 05:16:45,795] [ INFO] - Checking return code of pid 5184 - (operators.py:263) . . . [2021-04-07 05:16:45,870] [ INFO] - Checking return code of pid 5177 - (operators.py:263) [2021-04-07 05:16:45,870] [ INFO] - Pipeline fetch_exchange_rates exited with code 0 - (task.py:32) [2021-04-07 05:16:45,870] [ INFO] - Job ended at 2021-04-07 05:16:45.870886 - (__init__.py:189) [2021-04-07 05:16:45,870] [ INFO] - Processed in 0:00:01.542913 - (__init__.py:191) ``` The long format CSV file looks like this: ```shell date,symbol,rate 2021-03-31T00:00:00Z,AED,3.6732 2021-03-31T00:00:00Z,AFN,77.250007 ``` Now that we know how to develop and run the pipeline locally, we will gradually start thinking about how to deploy this in the cloud *severlessly*. We will learn how to save and fetch the configurations to the remote storage. Before doing that, we will cover how to set up AWS account and profile in the next section.