sudopower

TikTok data scraping

If you do not live under a cave, you sure must have heard of TikTok.
It allows users to create 15-second videos, sound tracked by music clips.
It sounds simple enough, but it is a wildly popular concept.

I am not a big fan but had to study it for one of our customers.

Here are some statistics. I’ll talk a little about this study then share how you can set up a flow to collect TikTok data. Click here to skip to code 📊 💻.

TikTok Stats at the time of writing-

  • Around 689 million active users.
  • It has over 2 billion downloads on the play store.
  • Over 1 billion total video views.
  • Available in 155 countries.
  • On average, a user spends 52 minutes per day!

source

It’s another decoy for getting meaningful things done in life 😠. But I’ll try to hate it less for the sake of this post 🙄

Heisenbergers GIFs - Get the best GIF on GIPHY

So it seems very useful for the following purposes –

Ads (Snuck in your feed)

Influencer Marketing

Branding

Customer Engagement

TikTok ads cost more than other social media platforms, with CPM as high as 10$.

Influencer marketing is less annoying and yields good returns since they’ve created their niche audience who are faithful to them 😇 At least for the time being 😟 until they get bored 😐 the influencer runs out of content 😢 or people find someone more interesting 🤩 But it seems relatively cheap and more effective.

So which data would you collect? It’s very subjective but, I think these could be useful

  1. Follower count
  2. Following count
  3. # of views on a post
  4. Likes on a post
  5. Comments on a post
  6. Shares of a post
  7. Hearts received (not literally)
  8. Delta counts per day of the above stats!
  9. Posts liked by a user
  10. Backing up user posts (Downloading)

To collect data from TikTok, they provide an API for content management and marketing. But we want to gather other user’s data, which they don’t allow 🙁

TikTok has a web app 🤔 So we turn to scrape ⛏️💪 !!!!!!!!!!

This library provides an API to scrape TikTok data from their web app using Selenium or Playwright Web Automation.

https://github.com/davidteather/TikTok-Api

TikTok realizes this and blocks your IP if you try to scrape data.
So what do we do?

Good Luck, I'm Behind 7 Proxies | Know Your Meme

Just use a Proxy Server !

Let’s outline the steps –

  1. Collect a list of TikTok handles you want to scrape.
    1. It can be a list of suggested accounts on TikTok from a seed account.
    2. It can be a list of Tiktok handles that users registered on your website.
    3. It can be a list of handles you’re managing or want to monitor (or spy ? 😈 ).
  2. Collect a list of stats required.
  3. If storing in a database, design a schema.
  4. Set up a script to loop over your list of users.
  5. Create a proxy server / Buy a service
    1. Buy a service ? (https://www.techradar.com/in/best/proxies) OR
    2. To create your own proxy server-
      1. Subscribe to ADEO Anonymous HTTPS Proxy (https://aws.amazon.com/marketplace/pp/prodview-lo2helasbjorq)
      2. Create an AWS EC2 instance
      3. Attach an elastic IP
      4. That’s it. This elastic IP is your proxy server IP!
  6. Create a headless browser instance using this proxy IP (The library has inbuilt functions for this)
  7. Call the profile details functions from the library. 
  8. Parse each JSON record and store it as a row in a CSV
  9. Import into your database, if applicable.
  10. Run static DB queries to populate the delta counts.

It roughly looks like this-

import requests
from TikTokApi import TikTokApi
import json

import utilityFunctions #custom made helper functions for eg Json key value to csv data
import time



class Tiktok( utilityFunctions):

	def __init__(self,args):
        #initialize global vars
		
		utilityFunctions.__init__(self, args)		
		
		#Put your configurations in a JSON file
        self.data_call = self.extractor_config['data']['call'][0]
		
		#proxy ip for avoiding reCaptcha after too many API Calls, in this case I've setup a proxy server on EC2 that rotates IPs
		self.proxy = self._chainedGet(self.data_call,"proxy_ip")
		
		#api response data file dump path
		self.local_tmp_file = self._chainedGet(self.data_call,"local_tmp_file")
		
		#response file content type
		self.content_type = self._chainedGet(self.data_call,"content_type")
		
		#Delay in seconds before each API call
		self.delay = self._chainedGet(self.data_call,"scrape_delay")

		#List of static tiktok hanldes to be scrapped, if you don't use API or want to manually add tiktok handles
		self.static_tiktok_handles = self._chainedGet(self.data_call,"static_tiktok_handles")
		
		if self.delay is None:
			self.delay = 5
		
		#user listing API/ this will give list of users to get the TikTok handles for
		self.user_list_api_url = self._chainedGet(self.data_call,"user_list_api_url")
		
		if self.proxy is not None:
			self.api = TikTokApi.get_instance(proxy=self.proxy)
		else:
			self.api = TikTokApi.get_instance()
		
		self.userList = self.getUserList()

		if self.static_tiktok_handles is not None and len(self.static_tiktok_handles)>0:
			self.userList = self.static_tiktok_handles + self.userList

	def getTikTokUserLikeList(self,tiktokUsername):
        #To scrape and assemble a JSON of posts Liked by a tiktok handle
		try:
			self.logger.info("Fetching User Likes for : "+tiktokUsername)			
			try:
				likeListDataResponse = self.api.userLikedbyUsername(tiktokUsername,count=2000, language='en')
			except Exception as e:
				self.logger.exception("Cannot fetch user "+tiktokUsername+": "+str(e))
				return
			self.logger.info("Delaying API Call for "+str(self.delay)+" seconds")
			time.sleep(self.delay)   
			if likeListDataResponse is not None:
				likeListData=[]
				
				for item in likeListDataResponse:
					item["uniqueId"]=tiktokUsername
					likeListData.append(item)

			return likeListData
		except Exception as e:
			self.logger.exception("Error in getTikTokUserLikeList: "+str(e))

	def getTikTokVideoData(self,transformerObj):
        #To loop over accounts and assemble all users posts details in a list of json objects
		try:
			local_tmp_file = self.local_tmp_file
			content_type = self.content_type

			if local_tmp_file is None:
				local_tmp_file = "/tmp/VideoData.json"

			if content_type is None:
				content_type = "json"

			self.transformerObj = transformerObj
			
			userList = self.userList

			TikTokData = []

			for idx,username in enumerate(userList):  
				self.logger.info("Total  Users: "+str(len(userList)))
				self.logger.info("Processing user no "+str(idx+1)+"  Username: "+username)
				try:
					TikTokUsername = self.getTikTokHandle(username)[0] 
					Id = self.getTikTokHandle(username)[1] 
					if TikTokUsername is None and username in self.static_tiktok_handles:
						TikTokUsername = username
               
					if TikTokUsername is not None:
						self.logger.info("TikTok handle: "+TikTokUsername)
						self.logger.info("Delaying API Call for "+str(self.delay)+" seconds")
						time.sleep(self.delay)    
						TikTokUserProfileData = self.getTikTokProfileVideos(TikTokUsername)
								
						TikTokData.extend(TikTokUserProfileData)
				except Exception as e:
					self.logger.exception("Error in getTikTokVideoData: "+str(e))
			
			data_count = len(TikTokData)

			with open(local_tmp_file, 'w', encoding='utf8') as json_file:
				json.dump(TikTokData, json_file, ensure_ascii=False)
			
			data_config = {
			"content_type": content_type
			}
			
			aFileContents = self.extractFileData(data_config, local_tmp_file)

			rTransformer = self.transformerObj.transform_process( aFileContents )

			self.pagination["total"] += data_count

			if( rTransformer != True ):
				self.logger.exception("Transformation Error")
				raise Exception("Transformation Error")

			return True
		except Exception as e:
			self.logger.exception("Error in getTikTokVideoData :"+str(e))
		

	def getTikTokUserData(self,transformerObj):
        #To loop over accounts and assemble all users profile details in a list of json objects
		try:
			local_tmp_file = self.local_tmp_file
			content_type = self.content_type

			if local_tmp_file is None:
				local_tmp_file = "/tmp/UserData.json"

			if content_type is None:
				content_type = "json"

			self.transformerObj = transformerObj
			
			userList = self.userList
			

			TikTokData = []
			for idx,username in enumerate(userList):  
				self.logger.info("Total  Users: "+str(len(userList)))
				self.logger.info("Processing user no "+str(idx+1)+"  Username: "+username)
				try:
					TikTokUsername = self.getTikTokHandle(username)[0]  
					Id = self.getTikTokHandle(username)[1]            
					if TikTokUsername is None and username in self.static_tiktok_handles:
						TikTokUsername = username

					if TikTokUsername is not None:
						self.logger.info("TikTok handle: "+TikTokUsername)
						self.logger.info("Delaying API Call for "+str(self.delay)+" seconds")
						time.sleep(self.delay)    
						TikTokUserProfileData = self.getTikTokProfile(TikTokUsername)
							
						TikTokData.extend(TikTokUserProfileData)
				except Exception as e:
					self.logger.exception("Error in getTikTokUserData: "+str(e))

			data_count = len(TikTokData)

			with open(local_tmp_file, 'w', encoding='utf8') as json_file:
				json.dump(TikTokData, json_file, ensure_ascii=False)
			
			data_config = {
			"content_type": "json"
			}

			aFileContents = self.extractFileData(data_config, local_tmp_file)

			self.pagination["total"] += data_count
			rTransformer = self.transformerObj.transform_process( aFileContents )

			if( rTransformer != True ):
				self.logger.exception("Transformation Error")
				raise Exception("Transformation Error")

			return True
		except Exception as e:
			self.logger.exception("Error in getTikTokUserData :"+str(e))
		
	def getTikTokUserLikeListData(self,transformerObj):
        #To loop over accounts and assemble all users liked tiktok posts details in a list of json objects
		try:
			
			local_tmp_file = self.local_tmp_file
			content_type = self.content_type
			

			if local_tmp_file is None:
				local_tmp_file = "/tmp/UserLikeListData.json"

			if content_type is None:
				content_type = "json"

			self.transformerObj = transformerObj
			
			userList = self.userList

			TikTokData = []
			for idx,username in enumerate(userList):  
				self.logger.info("Total  Users: "+str(len(userList)))
				self.logger.info("Processing user no "+str(idx+1)+"  Username: "+username)
				try:
					TikTokUsername = self.getTikTokHandle(username)[0]
					
					if TikTokUsername is None and username in self.static_tiktok_handles:
						TikTokUsername = username
               
					if TikTokUsername is not None:
						self.logger.info("TikTok handle: "+TikTokUsername)
						self.logger.info("Delaying API Call for "+str(self.delay)+" seconds")
						time.sleep(self.delay)    
						TikTokUserProfileLikeListData = self.getTikTokUserLikeList(TikTokUsername)
						if TikTokUserProfileLikeListData is not None and len(TikTokUserProfileLikeListData)>0:
							TikTokData.extend(TikTokUserProfileLikeListData)
				except Exception as e:
					self.logger.exception("Error in getTikTokUserData: "+str(e))

			data_count = len(TikTokData)

			with open(local_tmp_file, 'w', encoding='utf8') as json_file:
				json.dump(TikTokData, json_file, ensure_ascii=False)
			
			data_config = {
			"content_type": "json"
			}

			aFileContents = self.extractFileData(data_config, local_tmp_file)

			self.pagination["total"] += data_count
			rTransformer = self.transformerObj.transform_process( aFileContents )

			if( rTransformer != True ):
				self.logger.exception("Transformation Error")
				raise Exception("Transformation Error")

			return True
		except Exception as e:
			self.logger.exception("Error in getTikTokUserData :"+str(e))

	def getUserList(self):
		#This will call an API to get the list of users 
		if self.user_list_api_url is None:
			self.user_list_api_url = "<YOU API URL that return USER>" 
		
		UserListResponse = requests.get(self.user_list_api_url, params="", headers="",timeout=5)
		UserListResponseJson = json.loads(UserListResponse.text)
		userList = []
		for user in UserListResponseJson["data"]["data"]:
			userList.append(user["user"]["username"])
		return userList

	def getTikTokHandle(self,username):
        #This will call another API to get the tiktok handle for that user
		if username is None:
			self.logger.info("No Tiktok handle for user")
			return
		tiktok_handle = None

		try:
			UserProfileAPIURL = "YOUR API URL/get-user-details-endpoint/"+username
			UserProfileResponse = requests.get(UserProfileAPIURL, params="", headers="",timeout=5)
			UserProfileResponseJson = json.loads(UserProfileResponse.text)
			
			for social_network in UserProfileResponseJson["data"]["social_networks"]:        
				tiktok_handle = social_network["tiktok_handle"]
						
			return [tiktok_handle]
		except Exception as e:
			self.logger.exception("Error in getTikTokHandle: "+str(e))


	def getTikTokProfileVideos(self,tiktokUsername):
        #To get the stats for tiktok posts / videos
		try:
			self.logger.info("Fetching Posts for user: "+tiktokUsername)
			try:
				profileGenerator = self.api.getUserPager(tiktokUsername, cursor=0)
			except Exception as e:
				self.logger.exception("Cannot fetch user "+tiktokUsername+": "+str(e))
				return
			tikTokList = []
			count = 0
			for item in profileGenerator:
				if item is not None:        
					self.logger.info("Delaying API Call for "+str(self.delay)+" seconds")
					time.sleep(self.delay)    
					tikTokList.extend(item)
					count = count + len(item)
				self.logger.info("Posts fetched:"+str(count))
			return tikTokList
		except Exception as e:
			self.logger.exception("Error in getTikTokProfileVideos: "+str(e))

	def getTikTokProfile(self,tiktokUsername):
        #To scrape and assemble a JSON of TikTok Profile Data of a tiktok handle
		try:
			self.logger.info("Fetching User Profile for : "+tiktokUsername)
			try:
				profileDataResponse = self.api.getUser(tiktokUsername, cursor=0)
			except Exception as e:
				self.logger.exception("Cannot fetch user "+tiktokUsername+": "+str(e))
				return
			self.logger.info("Delaying API Call for "+str(self.delay)+" seconds")
			time.sleep(self.delay)    
			profileData = profileDataResponse["userInfo"]
			
			return [profileData]
		except Exception as e:
			self.logger.exception("Error in getTikTokProfile: "+str(e))

If you’re going to store it in a database, you can create a schema like below-

/*Table for TikTok UserData*/
create table IF NOT EXISTS orgUserData(
    org_id bigint(20) NOT NULL,
    userId bigint(20) NOT NULL,
    uniqueId varchar(255) NOT NULL,
    nickname varchar(255) DEFAULT NULL,
    userCreatedDate datetime DEFAULT NULL,
    avatarThumb text DEFAULT NULL,
    secUid varchar(255) DEFAULT NULL,
    privateAccount tinyint(1) DEFAULT '0',
    secret tinyint(1) DEFAULT '0',
    unique (org_id)
);

/*Table for TikTok UserStats*/
create table IF NOT EXISTS orgUserStats(
    userId bigint(20) NOT NULL,
    followerCount bigint(20) DEFAULT NULL,
    followingCount bigint(20) DEFAULT NULL,
    heartCount bigint(20) DEFAULT NULL,
    videoCount bigint(20) DEFAULT NULL,
    diggCount bigint(20) DEFAULT NULL,
    unique(userId)
);


/*Table for TikTok Posts meta Info*/
create table IF NOT EXISTS orgContentData(
    contentId bigint(20) NOT NULL,
    contentTitle varchar(255) DEFAULT NULL,
    contentDescription text,
    contentCreatedDate datetime DEFAULT NULL,
    contentCover text DEFAULT NULL,
    contentOriginCover text DEFAULT NULL,
    contentType varchar(10) DEFAULT 'video',
    PRIMARY KEY (contentId)
);

/*Table for TikTok Posts & User Data Mapping with Content Stats*/
CREATE TABLE IF NOT EXISTS orgUserContentUploadData (
    userId bigint(20) NOT NULL,
    contentId bigint(20) NOT NULL,
    diggCount bigint(20) DEFAULT NULL,
    shareCount bigint(20) DEFAULT NULL,
    commentCount bigint(20) DEFAULT NULL,
    playCount bigint(20) DEFAULT NULL,
    PRIMARY KEY (contentId),
    CONSTRAINT orgUserContentUploadData_ibfk_1 FOREIGN KEY (contentId) REFERENCES orgContentData (contentId)
);

/*Table for TikTok Posts Stats*/
create table IF NOT EXISTS orgContentStats(
    contentId bigint(20) NOT NULL,
    diggCount bigint(20) DEFAULT NULL,
    shareCount bigint(20) DEFAULT NULL,
    commentCount bigint(20) DEFAULT NULL,
    playCount bigint(20) DEFAULT NULL,
    PRIMARY KEY (contentId),
    CONSTRAINT orgContentDataInStats_ibfk_1 FOREIGN KEY (contentId) REFERENCES orgContentData (contentId)
);


/*Table for TikTok User Likes*/
create table IF NOT EXISTS orgUserLikes(
    userId bigint(20) NOT NULL,
    contentId bigint(20) NOT NULL,
    UNIQUE KEY user_content_unqiue (userId, contentId)
);

/*For Delta counts*/
select
    distinct cd.userId,
    cd.contentId,
    tmp.diggCount - cd.diggCount as diggDiff,
    tmp.shareCount - cd.shareCount as diggDiff,
    tmp.commentCount - cd.commentCount as commentDiff,
    tmp.playCount - cd.playCount as playDiff
from
    orgContentDataTmp tmp
    join orgUserContentUploadData cd on cd.contentId = tmp.contentId;

/*Table to store stats delta per day*/
/*We use a different DB for this, can you guess ? */
CREATE TABLE orgUserContentUploadDataDeltas (
    userId int NOT NULL,
    contentId int NOT NULL,
    diggCount int DEFAULT NULL :: int,
    shareCount int DEFAULT NULL :: int,
    commentCount int DEFAULT NULL :: int,
    playCount int DEFAULT NULL :: int,
    date_time timestamp DEFAULT (now()) :: timestamptz(6)
);

In the End 🥁🥁🥁🥁   –

Now you can run an Analysis on this data!
Use it for machine learning, running campaigns with influencers,
monitoring account stats for optimizing engagement and lots more.

Want to get this setup ? with detailed reporting, analytic dashboards with useful KPIs, and tracking.
You can check out Catchmedia Inc (https://www.catchmedia.com/). We’ve got an awesome team 😎

In case you have any questions or suggestions for this post. Please leave comments.
Thanks for checking this out! 🙂