Consume Qualtrics survey results in SAP Analytics Cloud with live connection to SAP HANA service

More than ever businesses around the world are recognizing the importance of treating their customers and employees like ‘real’ people and not “users”, “buyers” , “resources”et al. Welcome to the experience economy! It is time everyone understands the importance of experience management and the powerful outcome and business value generated through the insights obtained by combining X-data and O-data.

SAP offers some powerful solutions to enable such an experience economy. SAP Business Technology Platform at the core and center enables this by connecting X-data from Qualtrics and O-data from various SAP Line of Business services together. SAP Analytics cloud is the go-to analytical solution offering on the platform, that can be leveraged for data-driven insights connecting O-Data from  Line of Businesses to X-Data from Qualtrics, driving decisions that create “people experiences”

 

For some time now, SAP Analytics Cloud(SAC) has a standard connector to Qualtrics to fetch and analyze survey responses. Refer to this blog post by Rituparna Reddi for more information. The Qualtrics connector makes it easy, efficient and quick to consume Qualtrics survey results in SAC in conjunction with O-Data from other data sources.

In this blog post however, I would discuss the technical details of another approach/use case where I have done the following:

  • Pull the Qualtrics survey result and write data into SAP HANA on SAP Cloud Platform every time a response is submitted.
  • Replicate data from Line of Business(es) into HANA
  • Correlate this data through calculation views
  • Consume the calculation view in SAC for analysis

Usecase:

For the sake of better understanding I have come up with a simple use case.

William James, American psychologist said: “The deepest principle of human nature is a craving to be appreciated.

There are numerous studies out there which prove a lack of appreciation at work is one of the major reasons for employee turnover in a company. It is also believed appreciation has a direct impact on employee productivity. Here is an article which puts this in perspective. What if we can correlate responses on specific questions about employee appreciation, to the number of awards given out and the number of achievements logged in a SuccessFactors system.

Here’s a quick architecture diagram:

 

Let’s now look at the details of the prototype:

1. Pull the Qualtrics survey result and write data into SAP HANA on SAP Cloud Platform every time a response is submitted.

Here’s the approach :

1.a. I have an employee engagement survey of type Employee XM created in Qualtrics.

1.b. I have my HANA service instance up and running and I have created a multi-target application with a HANA DB module . For starters, here is a great developer tutorial you can refer.

Here is a snapshot of how my entity looks like,

 

1.b. I have used Qualtrics APIs to read the response real-time , every time a response is completed and submitted. This data is written into my table that I have generated in the last step.

Refer to this well documented guide from Qualtrics on how to listen to and retrieve responses real-time 

Essentially the steps are as follows :

  • I have hosted a Python-based web hook on SAP Cloud Platform CF which reads a response using Qualtrics Response API and writes to the HANA Table using hdbcli library. Find the script below, and the complete project on github.

 

from http.server import BaseHTTPRequestHandler, HTTPServer
from hdbcli import dbapi
from urllib.parse import urlparse
import urllib
import sys
import requests
import io, os
import simplejson as json
import zipfile
import json
import re
import csv
import datetime

def onResponse(apiToken, surveyId, dataCenter):

    fileFormat = "csv"

    #Step 1 : Export Survey
    fileId = exportSurvey(apiToken,surveyId, dataCenter, fileFormat)

    #Step 2 : Parse file for records
    records = parseSurveyExport(fileId)

    #Step 3 : insert records in HANA DB
    writeRecordstoDB(records, surveyId)


def exportSurvey(apiToken, surveyId, dataCenter, fileFormat):

    surveyId = surveyId
    fileFormat = fileFormat
    dataCenter = dataCenter 

    # Setting static parameters
    requestCheckProgress = 0.0
    progressStatus = "inProgress"
    baseUrl = "https://{0}.qualtrics.com/API/v3/surveys/{1}/export-responses/".format(dataCenter, surveyId)
    headers = {
        "content-type": "application/json",
        "x-api-token": apiToken
    }

    #get Last timestamp
    startDate = getLastTimeStamp(surveyId)
    
    # Step 1: Creating Data Export , get Responses after the last timestamp
    downloadRequestUrl = baseUrl

    if startDate != '':
        downloadRequestPayload = '{"format":"' + fileFormat + '","useLabels":true,"startDate":"' + startDate + '"}'
    else:
        downloadRequestPayload = '{"format":"' + fileFormat + '","useLabels":true}'

    downloadRequestResponse = requests.request("POST", downloadRequestUrl, data=downloadRequestPayload, headers=headers)
    progressId = downloadRequestResponse.json()["result"]["progressId"]
    print(downloadRequestResponse.text)

    # Step 2: Checking on Data Export Progress and waiting until export is ready
    while progressStatus != "complete" and progressStatus != "failed":
        print ("progressStatus=", progressStatus)
        requestCheckUrl = baseUrl + progressId
        requestCheckResponse = requests.request("GET", requestCheckUrl, headers=headers)
        requestCheckProgress = requestCheckResponse.json()["result"]["percentComplete"]
        print("Download is " + str(requestCheckProgress) + " complete")
        progressStatus = requestCheckResponse.json()["result"]["status"]

    #step 2.1: Check for error
    if progressStatus is "failed":
        raise Exception("export failed")

    fileId = requestCheckResponse.json()["result"]["fileId"]

    # Step 3: Downloading file
    requestDownloadUrl = baseUrl + fileId + '/file'
    requestDownload = requests.request("GET", requestDownloadUrl, headers=headers, stream=True)

    # Step 4: Unzipping the file
    try:
        zipfile.ZipFile(io.BytesIO(requestDownload.content)).extractall("MyQualtricsDownload")
    except Exception as e:
        raise Exception("unzip failed" + e)

    fileName = requestDownload.headers['content-disposition']
    fileName = re.search('attachment; filename=(.+?).zip',fileName).group(1).replace("+"," ")

    return fileName

def parseSurveyExport(fileId):
    columnNumbers = {
                        "questionAnswerColumns":[],
                        "ResponseId": 0,
                        "managerID" : 0,
                        "employeeID" : 0,
                        "RecordedDate" : 0
                    }

    questions = []
    insertRecords = []
    
    with open("MyQualtricsDownload/" + fileId + ".csv") as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',')
        line_count = 0
        for row in csv_reader: 
            # get column numbers
            if line_count == 0:
                columnCount = len(row)
                for i in range(columnCount): 
                    if "SID" in row[i] or re.search("^Q(.*?)[0-9]", row[i]) :
                        columnNumbers["questionAnswerColumns"].append(i)
                    elif "RecordedDate" in row[i]:
                        columnNumbers["RecordedDate"] = i
                    elif "ResponseId" in row[i]:
                        columnNumbers["ResponseId"] = i
                    elif "Employee ID" in row[i]:
                        columnNumbers["employeeID"] = i
                    elif "Manager ID" in row[i]:
                        columnNumbers["managerID"] = i
                line_count += 1
            # get questions text
            elif line_count == 1:
                for columnNumber in columnNumbers["questionAnswerColumns"]:
                    question = {}
                    question["text"] = row[columnNumber]
                    question["columnNumber"] = columnNumber
                    questions.append(question)
                line_count += 1
            # get questions id
            elif line_count == 2:
                for q in questions:
                    q["id"] = re.search('{"ImportId":"(.+?)"}', row[q["columnNumber"]]).group(1)
                line_count += 1
            # get response records
            else:
                for q in questions:
                    record = {}
                    record["responseId"] = row[columnNumbers["ResponseId"]]
                    record["questionId"] = q["id"]
                    record["language"] = "en"
                    record["question"] = q["text"]
                    record["response"] = row[q["columnNumber"]]
                    if columnNumbers["managerID"] > 0:
                        record["managerId"] = row[columnNumbers["managerID"]]
                    else:
                        record["managerId"] = ''
                    if columnNumbers["employeeID"] > 0:
                        record["employeeID"] = row[columnNumbers["employeeID"]]
                    else:
                        record["employeeID"] = ''
                    record["responseDate"] = row[columnNumbers["RecordedDate"]]
                    insertRecords.append(record)
                line_count += 1
        print(f'Processed {line_count} lines.')
    
    return insertRecords

def writeRecordstoDB(records, surveyId):
    #Step 1 : Open connection to HDB
    conn = open_hdb_conn()

    #Step 2 : Owrite records to HDB
    if conn and conn.isconnected():
        print("connection to HDB open")
        conn.setautocommit(False)
        cursor = conn.cursor()
        for record in records:
            id = '"<SCHEMA>"."rid".NEXTVAL'
            values = id + ", '" + record["responseId"] + "', '" + record["questionId"] + "', '" + record["language"] + "', '" + record["question"] + "', '" +  record["response"] + "', '" +  record["managerId"] + "', '" +  record["employeeID"] + "', '" +  record["responseDate"] + "', '" +  surveyId + "'"
            cursor.execute("INSERT INTO \"<SCHEMA>\".\"<TABLE>\" VALUES(" + values +")")
            conn.commit()
            rowcount = cursor.rowcount
            if rowcount == 1:
                print("record is updated")
    
    #Step 3 : close connection to HDB
    close_hdb_conn(conn)

def getLastTimeStamp(surveyId):
    #Step 1 : Open connection to HDB
    conn = open_hdb_conn()

    #Step 2 : Get latest timestamp
    startDateforExportString = ''
    if conn and conn.isconnected():
        sql = "SELECT TOP 1 \"RESPONSEDATE\" FROM \"<SCHEMA>\".\"<TABLE>\" as \"response\" where \"SURVEYID\"='" + surveyId + "' " + 'order by "response"."RESPONSEDATE" desc'
        cursor = conn.cursor()
        cursor.execute(sql)
        row = cursor.fetchone()
        if row and len(row) == 1:
            lastResponseDate = row[0]
            startDateforExport = lastResponseDate + datetime.timedelta(0,1)
            startDateforExportString = startDateforExport.strftime("%Y-%m-%dT%H:%M:%SZ")
            
    #Step 3 : close connection to HDB
    close_hdb_conn(conn)

    return startDateforExportString

def open_hdb_conn():
    print("opening connection to HDB")
    try:
        conn = dbapi.connect(address="<DB Host>", encrypt="true", port="<DB port>", user="<DB user>", sslValidateCertificate='false', password="<pwd>")
    except Exception as e:
        raise Exception("Open connection failed" + e)

    return conn

def close_hdb_conn(conn):
    if conn:
        try:
            conn.close()
            print("connection to HDB closed")
        except Exception as e:
            if conn and not conn.isconnected():
                print("connection to HDB closed")
            
def getReponse(d, dataCenter, apiToken):
    responseId = d['ResponseID']
    surveyId = d['SurveyID']
    
    headers = {
        "content-type": "application/json",
        "x-api-token": apiToken,
       }

    url = "https://{0}.qualtrics.com/API/v3/surveys/{1}/responses/{2}".format(dataCenter, surveyId, responseId)

    
    rsp = requests.get(url, headers=headers)
    print(rsp.json())

def parsey(c):
    x=c.decode().split("&")
    d = {}
    for i in x:
        a,b = i.split("=")
        d[a] = b

    d['CompletedDate'] = urllib.parse.unquote(d['CompletedDate'])

    return d

class Handler(BaseHTTPRequestHandler):

  # POST
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)
        d = parsey(post_data)
        surveyId = d['SurveyID']

        try:
            apiToken = "<Qualtrics API key>"
            dataCenter = "<Qualtrics data center>"
           
        except KeyError:
            print("set environment variables APIKEY and DATACENTER")
            sys.exit(2)
        

        #import all responses for survey and write to database
        #onResponse(apiToken, surveyId, dataCenter)

        #get single response 
        getReponse(d, dataCenter, apiToken)
 
if __name__ == '__main__':
    
    print('starting server...')
    server_address = ('0.0.0.0', 8080)
 
    httpd = HTTPServer(server_address, Handler)
    print('running server...')
    httpd.serve_forever()

 

  • I have created an event subscription on the Qualtrics server, which calls this web hook on SAP Cloud Platform , every time a response is completed and submitted.
curl -X POST -H 'X-API-TOKEN: yourapitoken'  -H 'Content-Type: application/json' -d '{
    "topics": "surveyengine.completedResponse.yoursurveyid",
    "publicationUrl": "http://<app url on CF>.hana.ondemand.com",
    "encrypt": false
}' 'https://co1.qualtrics.com/API/v3/eventsubscriptions/'

 

2. Replicating data from Line of Business(es) into HANA using Smart Data Integration (SDI)

SuccessFactors provides APIs to access data, enable open integration and allow easy extensions. APIs are of type SOAP/OData. For this prototype I am interested in the oData API for Continuous Performance Management

In this step I have done the following :

2.a Set up the oData Adapter on the HANA Service for Smart Data Integration. 

2.b Configure the SuccessFactors System as a remote source system for data replication.

You can follow this blog post to set up the adapter and configure the SuccessFactors system for data replication.

2.c Create virtual tables for the tables of interest. I am interested in getting the User data, Achievements and SpotAward details from the SuccessFactors System.

2.d. Created FlowGraphs to only extract the fields I need and set up replication tasks.

While creating virtual tables from a remote source , you need to ensure the technical user of your HDI container is authorized to access to the remote source. You can take a look at the following links to understand how to do this:

https://blogs.sap.com/2019/02/23/smart-data-integration-cross-container-access-and-the-sap-hana-service/

As always, one of the best references for anything HANA related is SAP HANA Academy . To understand and create flowgraphs, replication tasks refer the playlist from SAP HANA academy for “how to use SDI in SAP HANA Service”

Let’s see how this looks in my Web IDE :

SFSF Remote source:

 

Once I have my access setup from my MTA project to the remote source (as per the steps in the blog/video referenced previously), I can set up my virtual tables.

I have created 3 virtual tables, for User, Achievement and SpotAward.

VIRTUAL TABLE "SFSFAchievement" AT "sfsf"."Achievement"

VIRTUAL TABLE "SFSFSpotAward" AT "sfsf"."SpotAward"

VIRTUAL TABLE "SFSFUser" AT "sfsf"."User"

The flowgraph in my WebIDE . This maps only the fields that I am interested in from the virtual tables on the source side to new tables on the target side.

 

So now I have my target tables and data , which I can consume in my calculation views

 

3. Correlate this data through calculation views

Create calculation views to correlate data from SuccessFactors on how many achievements an employee has logged vs the awards that he has received and map it to how he feels.

 

 

 

Here’s the view of the correlated data from the calculation view that compares employee appreciation with actual achievements and awards given out

 

 

4. Consume the calculation view in SAC for analysis

I have consumed calculation views created in my HANA service on Cloud Foundry through live access in SAC. You can refer to this blog post which collates the information you can use to set up such a connection.

Here is a screenshot of the SAC story which shows X+O Data how in most cases where the employee is demotivated and feels unappreciated, there is a direct relation using quantifiable numbers how many achievements he has had and how many awards were given out. I have set up an auto refresh for the model every 5 seconds

 

Every time a new survey response is submitted, the new data gets written to HANA, and also through the replication tasks new data from the SF system is also written into HANA. I also have an auto-refresh configured on the SAC story, So, the data that you will see on the SAC dashboard will mostly be latest.

Cheers!

Original Article:
https://blogs.sap.com/2020/03/26/consume-qualtrics-survey-results-real-time-in-sap-analytics-cloud-using-sap-hana-service/

ASK SAP EXPERTS ONLINE
Related blogs

LEAVE A REPLY

Please enter your comment!
Please enter your name here