Share via


Ingest data from Salesforce

This page describes how to ingest data from Salesforce and load it into Azure Databricks using Lakeflow Connect.

Before you begin

To create an ingestion pipeline, you must meet the following requirements:

  • Your workspace is enabled for Unity Catalog.
  • Serverless compute is enabled for your workspace. See Enable serverless compute.
  • If you plan to create a connection: You have CREATE CONNECTION privileges on the metastore.
  • If you plan to use an existing connection: You have USE CONNECTION privileges or ALL PRIVILEGES on the connection object.
  • You have USE CATALOG privileges on the target catalog.
  • You have USE SCHEMA and CREATE TABLE privileges on an existing schema or CREATE SCHEMA privileges on the target catalog.

To ingest from Salesforce, the following is recommended:

  • Create a Salesforce user that Databricks can use to retrieve data. Make sure that the user has API access and access to all of the objects that you plan to ingest.

Create an ingestion pipeline

Permissions required: USE CONNECTION or ALL PRIVILEGES on a connection.

This step describes how to create the ingestion pipeline. Each ingested table is written to a streaming table with the same name (but all lowercase).

Databricks UI

  1. In the sidebar of the Azure Databricks workspace, click Data Ingestion.

  2. On the Add data page, under Databricks connectors, click Salesforce.

    The Salesforce ingestion wizard opens.

  3. On the Pipeline page of the wizard, enter a unique name for the ingestion pipeline.

  4. In the Destination catalog dropdown, select a catalog. Ingested data and event logs will be written to this catalog.

  5. Select the Unity Catalog connection that stores the credentials required to access Salesforce data.

    If there are no Salesforce connections, click Create connection. You must have the CREATE CONNECTION privilege on the metastore.

  6. Click Create pipeline and continue.

  7. On the Source page, select the tables to ingest, and then click Next.

    If you select All tables, the Salesforce ingestion connector writes all existing and future tables in the source schema to the destination schema. There is a maximum of 250 objects per pipeline.

  8. On the Destination page, select the Unity Catalog catalog and schema to write to.

    If you don't want to use an existing schema, click Create schema. You must have the USE CATALOG and CREATE SCHEMA privileges on the parent catalog.

  9. Click Save pipeline and continue.

  10. (Optional) On the Settings page, click Create schedule. Set the frequency to refresh the destination tables.

  11. (Optional) Set email notifications for pipeline operation success or failure.

  12. Click Save and run pipeline.

Databricks Asset Bundles

This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see Databricks Asset Bundles.

  1. Create a new bundle using the Databricks CLI:

    databricks bundle init
    
  2. Add two new resource files to the bundle:

    • A pipeline definition file (resources/sfdc_pipeline.yml).
    • A workflow file that controls the frequency of data ingestion (resources/sfdc_job.yml).

    The following is an example resources/sfdc_pipeline.yml file:

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
    

    The following is an example resources/sfdc_job.yml file:

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. Deploy the pipeline using the Databricks CLI:

    databricks bundle deploy
    

Databricks CLI

You can use the following table configuration properties in your pipeline definition to select or deselect specific columns to ingest:

  • include_columns: Optionally specify a list of columns to include for ingestion. If you use this option to explicitly include columns, the pipeline automatically excludes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.
  • exclude_columns: Optionally specify a list of columns to exclude from ingestion. If you use this option to explicitly exclude columns, the pipeline automatically includes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.

To create the pipeline:

databricks pipelines create --json "<pipeline-definition | json-file-path>"

To update the pipeline:

databricks pipelines update --json "<pipeline-definition | json-file-path>"

To get the pipeline definition:

databricks pipelines get "<pipeline-id>"

To delete the pipeline:

databricks pipelines delete "<pipeline-id>"

For more information, you can run:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Example JSON pipeline definition

"ingestion_definition": {

     "connection_name": "<connection-name>",

     "objects": [

       {

         "table": {

           "source_schema": "<source-schema>",

           "source_table": "<source-table>",

           "destination_catalog": "<destination-catalog>",

           "destination_schema": "<destination-schema>",

           "table_configuration": {

             "include_columns": ["<column-a>", "<column-b>", "<column-c>"]

           }

         }

       }

     ]

 }

Start, schedule, and set alerts on your pipeline

You can create a schedule for the pipeline on the pipeline details page.

  1. After the pipeline has been created, revisit the Azure Databricks workspace, and then click Pipelines.

    The new pipeline appears in the pipeline list.

  2. To view the pipeline details, click the pipeline name.

  3. On the pipeline details page, you can schedule the pipeline by clicking Schedule.

  4. To set notifications on the pipeline, click Settings, and then add a notification.

For each schedule that you add to a pipeline, Lakeflow Connect automatically creates a job for it. The ingestion pipeline is a task within the job. You can optionally add more tasks to the job.

Note

When the pipeline runs, you might see two source views for a given table. One view contains the snapshots for formula fields. The other view contains the incremental data pulls for non-formula fields. These views are joined in the destination table.

Example: Ingest two Salesforce objects into separate schemas

The example pipeline definition in this section ingests two Salesforce objects into separate schemas. Multi-destination pipeline support is API-only.

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1 # Location of the pipeline event log
      schema: my_schema_1 # Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: AccountShare
              destination_catalog: my_catalog_1 # Location of this table
              destination_schema: my_schema_1 # Location of this table
          - table:
              source_schema: objects
              source_table: AccountPartner
              destination_catalog: my_catalog_2 # Location of this table
              destination_schema: my_schema_2 # Location of this table

Example: Ingest one Salesforce object three times

The example pipeline definition in this section ingests a Salesforce object into three different destination tables. Multi-destination pipeline support is API-only.

You can optionally rename a table that you ingest. If you rename a table in your pipeline, it becomes an API-only pipeline, and you can no longer edit the pipeline in the UI.

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1	# Location of the pipeline event log
      schema: my_schema_1	# Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_1	# Location of first copy
              destination_schema: my_schema_1	# Location of first copy
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of second copy
              destination_schema: my_schema_2	# Location of second copy
	        - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of third copy, renamed
              destination_schema: my_schema_2	# Location of third copy, renamed
              destination_table: order_duplicate # Table rename

Additional resources