Streamline BigQuery–Salesforce Integration Using Airflow
Stop relying on costly tools—learn how to automate data sync between BigQuery and Salesforce using Airflow for seamless operations and better ROI.
There are many articles available today that explain how to send data from Salesforce to Google BigQuery. What they often fail to explain, however, is how to send BigQuery data to Salesforce, and then retrieve the data back to log into a BigQuery table for further analysis. In this blog, we show how you can automate this process in a simple manner without using expensive third-party tools.
Source data is often sent in batches. When there is a massive amount of data, it is quite possible that there is a difference between the number of records sent from the source and the number of records received at the target. It is not viable to manually check if the number of records sent from BigQuery matches the row count in Salesforce. We can simplify this using a basic Python script and Apache Airflow.
Sending Data from BigQuery to Salesforce
To start the process, import the Salesforce class from the simple_salesforce module in your Python script. ‘Simple Salesforce’ is a REST API client built for Python.
Connect to Salesforce
Login to Salesforce or create a Salesforce connection with username, password, and security token details.
from simple_salesforce import Salesforce sf = Salesforce(username=’', password='', security_token='')
Send Data from BigQuery to Salesforce
-
-
- Select corresponding fields related to the Salesforce object from the BigQuery table.
- Select corresponding fields related to the Salesforce object from the BigQuery table.
-
query = "SELECT field1, field2, field3 from project_id.dataset_id.table_name" cursor.execute(query) result_set = cursor.fetchall()
- Change the result set to a data frame
df = pd.DataFrame(result_set, columns = ['AccountNumber__c','AccountStatus__c','Account_Id__c'])
The column names in the above example are related to the Salesforce Contact object. Similarly, populate multiple columns in this object as required.
Ensure that the order of columns queried from BigQuery and data frame is the same.
- Convert data frame to JSON
JSON conversion is required since the Salesforce instance accepts the JSON format.
jsondf = contactdf.to_json(orient= 'records') contact_json = json.loads(jsondf)
- Load Contact JSON data to Salesforce
x = sf.bulk.Contact.upsert(contact_json, 'Account_Id__c', batch_size=1000, use_serial=True)
There are other Salesforce objects like Address, Opportunity etc., for which data can be inserted in a similar fashion.
Retrieving Data from Salesforce
Here, data is accessed using SOQL - a language that looks like SQL and is used to query data from Salesforce.
First, collect the list of IDs that need to fetch data from Salesforce. Use a unique column in Salesforce for the purpose.
contactdf_for_sf = contactdf['’] IdList = [] IdList = contactdf_for_sf.tolist()
Fetching Data with SOQL
sf.queryall is a method from Simple Salesforce to fetch data using SOQL. In the current scenario, the required columns are pulled from Salesforce record by record, converted to a data frame, and appended to the List.
Appended_df = pd.DataFrame(columns=['Id', 'npe01__WorkEmail__c', 'AccountId']) for Id in IdList: sf_contact_data = sf.query_all( "SELECT Id, npe01__WorkEmail__c, AccountId FROM Contact WHERE npe01__WorkEmail__c = '%s' " %Id) df =pd.DataFrame(sf_contact_data['records']) Appended_df = Appended_df.append(df)
Loading Response Data from Salesforce to BigQuery
The ‘List’, which is the response from Salesforce, is loaded into a temporary BigQuery table. This temporary table is then used as the source table to update the target BigQuery table. This update is done to check for data that is not received by Salesforce.
Now, check for the records that do not have the response column updated in the BigQuery target table and send those records to Salesforce again for processing.
sfcontactdf.to_gbq(gbq_table_name,project_id,chunksize=None, if_exists='replace',verbose=False)
Automating the Process Using Airflow
You can schedule this Python script to run automatically at a scheduled time using Apache Airflow. Follow the steps below to deploy this code and automate it.
- Import the necessary python libraries as a first step
- Create DAG (Directed Acyclic Graph) object and required variables - include “schedule_interval” variable in this step to automatically run the DAG
- Add your tasks along with the data transfer script discussed above in one of the tasks
- Define the dependencies, i.e., the order of execution of the tasks
- Put this code into a file, “sample.py”
- Put “sample.py” file in the dags/ folder of Airflow
- See your DAG in action!
Conclusion
This simple script saves a lot of manual work, like cross-checking data between BigQuery and Salesforce. As you can see, a few simple lines of code coupled with an SOQL query gets the job done automatically without the need for expensive third-party solutions.
