Blueprints
Extract data, transform it, and load it in parallel to S3 and Postgres — in less than 7 seconds!
Source
yaml
id: api-json-to-postgres
namespace: company.team
tasks:
  - id: download
    type: io.kestra.plugin.core.http.Download
    uri: https://gorest.co.in/public/v2/users
  - id: ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: "{{ outputs.download.uri }}"
    newLine: false
  - id: json
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.ion.uri }}"
  - id: add_column
    type: io.kestra.plugin.graalvm.python.FileTransform
    from: "{{ outputs.json.uri }}"
    script: |
      from datetime import datetime
      logger.info('row: {}', row)
      row['inserted_at'] = datetime.utcnow()
  - id: parallel
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: postgres
        type: io.kestra.plugin.core.flow.Sequential
        tasks:
          - id: final_csv
            type: io.kestra.plugin.serdes.csv.IonToCsv
            from: "{{ outputs.add_column.uri }}"
            header: true
          - id: create_table
            type: io.kestra.plugin.jdbc.postgresql.Query
            url: jdbc:postgresql://host.docker.internal:5432/
            username: postgres
            password: qwerasdfyxcv1234
            sql: |
              CREATE TABLE IF NOT EXISTS public.raw_users
                (
                    id            int,
                    name          VARCHAR,
                    email         VARCHAR,
                    gender        VARCHAR,
                    status        VARCHAR,
                    inserted_at   timestamp
                );
          - id: load_data
            type: io.kestra.plugin.jdbc.postgresql.CopyIn
            url: jdbc:postgresql://host.docker.internal:5432/
            username: postgres
            password: qwerasdfyxcv1234
            format: CSV
            from: "{{ outputs.final_csv.uri }}"
            table: public.raw_users
            header: true
      - id: s3
        type: io.kestra.plugin.core.flow.Sequential
        tasks:
          - id: final_json
            type: io.kestra.plugin.serdes.json.IonToJson
            from: "{{ outputs.add_column.uri }}"
          - id: json_to_s3
            type: io.kestra.plugin.aws.s3.Upload
            from: "{{ outputs.final_json.uri }}"
            key: users.json
            bucket: kestraio
            region: "{{ secret('AWS_DEFAULT_REGION') }}"
            accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
            secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
About this blueprint
SQL AWS Kestra
EXTRACT: This workflow downloads data from an external API via HTTP GET request and stores the result in Kestra's internal storage in ION format.
TRANSFORM: The extracted file is then serialized to JSON and transformed row-by-row to add a new column.
LOAD: The final result is ingested in parallel to S3 and Postgres.