Tutorial: Upload data using a multipart web request job

This tutorial provides step-by-step instructions for using a Web Request job with the multipart functionality to perform bulk data upload into Salesforce.

For details on the REST API endpoints used in this tutorial, please refer to the following documentation pages:

Prerequisites

Before continuing with this tutorial, ensure you have completed the following prerequisites:

Generate a source file for bulk upload

  1. Open a Snowflake worksheet and execute the following statements to generate a source file in the DATA stage of the LEARN_DEV project:

    USE ROLE LEARN_DEV_DEVELOPER;
    USE SCHEMA LEARN_SW.DEV;
    
    -- generate a source table in LEARN_SW
    CREATE OR REPLACE TABLE INGESTED_ACCOUNTS
    AS
    SELECT '789' AS customExtIdField__c, 'Grand Hotels & Resorts Ltd' AS name,10000 AS NumberOfEmployees
    UNION
    SELECT '890','Express Logistics and Transport',5000
    UNION
    SELECT '901','University of Arizona',1000
    UNION
    SELECT '1350','United Oil & Gas Corp.',20000
    UNION
    SELECT '2690','University of The Terrific',2000;
    
    SELECT * FROM INGESTED_ACCOUNTS;
    
    -- generate the source file in DATA
    COPY INTO @SNOWEAVER.LEARN_DEV.DATA/sfdc/ingested_accounts.csv.gz
    FROM INGESTED_ACCOUNTS
    FILE_FORMAT = (TYPE = CSV COMPRESSION = GZIP FIELD_OPTIONALLY_ENCLOSED_BY='"')
    HEADER=TRUE
    OVERWRITE = TRUE
    SINGLE = TRUE ;
    

Import the sfdc_lib macro library

Note

If you’ve already imported the sfdc_lib macro library from previous tutorials, you can skip this section.

  1. Open the LEARN_DEV project in Snoweaver with the LEARN_DEV_DEVELOPER role.

  2. Open the Home page.

  3. Use the Importing / Exporting Resources function to import the the following library.

    • Format: On Screen
      Resource Type: Macro Lib
      New record: checked
      Name: sfdc_lib

      YAML Resource Data:

      macros: |-
         {%- macro get_job_info(response,interval,iterations) -%}
            {%- for i in range (iterations) -%}
                  {%-  set result=call_proc('sfdc_get_job_info',response) -%}
                  {%-  set result_dict=json_loads(result) -%}
                  {%- if result_dict.state in  ['JobComplete']  -%}
                     {{ result }}
                     {%- break -%}
                  {%- elif result_dict.state in  ['Aborted','Failed'] -%}
                     {{ raise_exception(result_dict['errorMessage']) }}
                  {%- endif -%}
                  {{- sleep(interval) -}}
                  {%- if loop.last -%}
                     {{- raise_exception('The job did not complete within the specified time limit.') -}}
                  {%- endif -%}
            {%- endfor -%}
         {%- endmacro -%}
      
         {%- macro check_error(status, response) -%}
            {%- if not( 200 <= status <= 299) -%}
               {{ raise_exception(response[0]['message']) }}
            {%- endif -%}
         {%- endmacro -%}
      

Create a bulk upload job

  1. Open the LEARN_DEV project in Snoweaver with the LEARN_DEV_DEVELOPER role.

  2. Open the Home page.

  3. Use the Importing / Exporting Resources function to import the the following job.

    Format: On Screen
    Resource Type: Job
    New record: checked
    Name: sfdc_bulk_ingest_object_records

    YAML Resource Data:

    endpoint: https://{{_proj.sfdc_account}}.my.salesforce.com/services/data/v61.0/jobs/ingest/
    external_integration: sfdc_learn_dev_access_integration
    headers: |-
      {
            "Authorization":"Bearer {{_secrets.token}}"
      }
    instance_type: procedure
    macro_libs:
      - sfdc_lib
    payload_format: multipart
    payload_parts:
      - content_type: application/json
        data: '{{_vars.metadata|tojson}}'
        filename: ''
        headers: ''
        name: job
        source_type: raw_data
      - content_type: text/csv
        data: '{{_vars.filepath}}'
        filename: content
        headers: ''
        name: content
        source_type: file
    posthook: '{{ get_job_info(_response,10,10) }}'
    posthook_replace_output: true
    request_type: post
    response_format: json
    save_results_scope: sfdc_job_run_history
    secrets:
      - alias: token
        secret: sfdc_learn_dev_oauth_access_token
    type: web_request
    variables:
      - name: filepath
        type: text
        value: sfdc/ingested_accounts.csv.gz
      - name: metadata
        type: object
        value: |-
          {
              "object" : "Account",
              "externalIdFieldName" : "customExtIdField__c",
              "contentType" : "CSV",
              "operation" : "upsert",
              "lineEnding" : "LF"
          }
    
  4. Open sfdc_bulk_ingest_object_records on the Jobs page.

  5. This job is configured as a Multipart job which uploads the file generated previously to Salesforce with specified metadata.

    ../_images/126.png
  6. The following variables are defined for the job.

    • filepath: represents the relative path of the source file within the DATA stage.

    • metadata: stores metadata related to the ingest job and the uploaded file.

    ../_images/212.png
  7. Review and test the Jinja template for specifying the job endpoint URL.

  8. Review and test the Jinja template for specifying the job headers.

    Warning

    The boundary header is automatically generated and should not be explicitly specified.

  9. This job’s payload configuration consists of two components:

    • job: stores metadata related to the job, extracted from the associated variable.

    • content: when the source type is set to File, the data column stores the relative path to the uploaded file.

    ../_images/412.png
  10. Test the template to validate the output.

    ../_images/4b1.png

Test the bulk upload job

  1. Click Preview Request to examine the web request generated by the job.

    ../_images/512.png
  2. Click Make a Test Call to initiate a web request to the specified endpoint URL.

  3. The job will display the Salesforce response in JSON format, confirming that a job has been queued to ingest the uploaded file.

    ../_images/79.png
  4. Click the copy icon next to the payload and paste the copied content into a text editor for future reference.

Create a Job to Retrieve Job Information

  1. Open the Home page.

  2. Use the Importing / Exporting Resources function to import the the following job.

    Format: On Screen
    Resource Type: Job
    New record: checked
    Name: sfdc_get_job_info

    YAML Resource Data:

    endpoint: https://{{_proj.sfdc_account}}.my.salesforce.com/services/data/v61.0/jobs/ingest/{{_vars.response.id}}/
    external_integration: sfdc_learn_dev_access_integration
    headers: |-
      {
            "Authorization":"Bearer {{_secrets.token}}"
      }
    instance_type: procedure
    request_type: get
    response_format: json
    secrets:
      - alias: token
        secret: sfdc_learn_dev_oauth_access_token
    type: web_request
    variables:
      - name: response
        type: object
        value: '{}'
    
  3. Return to the Jobs page.

  4. Open the job sfdc_get_job_info via the sidebar.

  5. This job retrieves the current status of the Salesforce job specfied in the response variable.

  6. Update the variable’s test value with the previously copied response payload.

    ../_images/107.png
  7. Review and test the Jinja template for specifying the job endpoint URL.

    ../_images/1111.png

    Note

    You can reference any key from the response variable because its data type is Object (JSON).

  8. Click Make a Test Call to verify the outcome of the ingest job.

    ../_images/127.png
  9. Click Build to build a job instance.

  10. Due to asynchronous job queuing and execution, a single execution of this job is insufficient. We will use a macro to address this issue,

Test the Macro to poll the job result

  1. Open sfdc_lib on the Macro Libs page.

  2. The get_job_info macro monitors the status of a specified job in Salesforce under the following conditions:

    • Waits for a specified interval (in seconds) and continues polling until the maximum iteration limit is reached.

    • If the upload job is not completed within the allowed iterations, an exception is raised.

    • Returns the response payload and terminates the polling loop when the job transitions to ‘JobComplete’.

    • Calls raise_exception and terminates the polling loop when the job transitions to , ‘Aborted’ or ‘Failed’.

  3. Copy and paste the code snippet below into the Validator. Substitute <response.id> with the actual ID extracted from the the previously copied response payload. (e.g. 750dM000002bDJhQAM).

    {% set _response={'id': '<response.id>'} %}
    {{ get_job_info(_response,10,10) }}
    

    Note

    In this context, we specifically require only the id value from the response. Therefore, we construct a simplified payload containing solely the id key.

  4. Click Test Macros to validate the get_job_info macro.

    ../_images/16b.png

Test the post-hook of the bulk upload job

  1. Open sfdc_bulk_ingest_object_records on the Jobs page.

  2. Review configuration for Enable Post-hook and Micro Libraries.

    ../_images/172.png
  3. Click Make a Test Call again in the Response Handler section to generate a new response for testing the post-hook.

  4. Click Test Template in the Post-Hook section to validate the output.

  5. In the Post-Hook section, the macro is invoked to validate the results of the ingest job. It then updates the output with the job information provided by the macro.

  6. Click Make a Test Call with Posthook in the Post-Hook Handler section to initiate a test call with the configured post-hook.

  7. The job will now use a post-hook to monitor the ingest job and return the final outcome.

    ../_images/192.png

Save job run history

  1. Review the Response Handler section.

  2. Notice that the Save job results option is enabled. This feature, configured with the scope value sfdc_job_run_history, ensures that each job execution is logged.

    ../_images/183.png
  3. Open a Snowflake worksheet and execute the statement below to verify that all previous job run results have been logged in the RESULTS table:

    USE ROLE LEARN_DEV_DEVELOPER;
    SELECT * FROM SNOWEAVER.LEARN_DEV.RESULTS;
    
    ../_images/231.png

Error handling

  1. Back to the job sfdc_bulk_ingest_object_records on the Jobs page.

  2. Update the job variable metadata with the following test value:

    {
       "object" : "Account",
       "externalIdFieldName" : "customExtIdField__c",
       "contentType" : "CSV",
       "operation" : "upsert",
       "lineEnding" : "CRLF"
    }
    
  3. Click Make a Test Call with Posthook to initiate another test call.

  4. The call now triggers an exception, with the error message derived from the response.

    ../_images/19a.png
  5. Click Build to build a job instance.

  6. Open a Snowflake worksheet

  7. Execute the statement below to trigger the job and verify the results.

    USE ROLE LEARN_DEV_DEVELOPER;
    
    CALL SNOWEAVER.LEARN_DEV.SFDC_BULK_INGEST_OBJECT_RECORDS(
    'sfdc/ingested_accounts.csv.gz',
    {
       'object' : 'Account',
       'externalIdFieldName' : 'customExtIdField__c',
       'contentType' : 'CSV',
       'operation' : 'upsert',
       'lineEnding' : 'LF'
    });
    
    ../_images/20.png
  8. Execute the statement below to trigger a failed run. The job will return a computation error in this scenario.

    CALL SNOWEAVER.LEARN_DEV.SFDC_BULK_INGEST_OBJECT_RECORDS(
    'sfdc/ingested_accounts.csv.gz',
    {
       'object' : 'Account',
       'externalIdFieldName' : 'customExtIdField__c',
       'contentType' : 'CSV',
       'operation' : 'upsert',
       'lineEnding' : 'CRLF'
    });
    
    ../_images/213.png
  9. For detailed instructions on capturing and displaying fatal error messages from Snoweaver Jobs, refer to Tutorial: Enable logging and capture errors

Warning

For large data uploads, the macro will perform prolonged polling, consuming your warehouse credits. An XSmall warehouse is generally adequate for these light workloads.
If a continuously active warehouse is not part of your setup for managing light and real-time workloads, consider scheduling a generic job to execute the polling macro after the anticipated completion time of the ingest job.