sudopower

TikTok data scraping

If you do not live under a cave, you sure must have heard of TikTok.
It allows users to create 15-second videos, sound tracked by music clips.
It sounds simple enough, but it is a wildly popular concept.

I am not a big fan but had to study it for one of our customers.

Here are some statistics. I’ll talk a little about this study then share how you can set up a flow to collect TikTok data. Click here to skip to code 📊 💻.

TikTok Stats at the time of writing-

  • Around 689 million active users.
  • It has over 2 billion downloads on the play store.
  • Over 1 billion total video views.
  • Available in 155 countries.
  • On average, a user spends 52 minutes per day!

source

It’s another decoy for getting meaningful things done in life 😠. But I’ll try to hate it less for the sake of this post 🙄

Heisenbergers GIFs - Get the best GIF on GIPHY

So it seems very useful for the following purposes –

Ads (Snuck in your feed)

Influencer Marketing

Branding

Customer Engagement

TikTok ads cost more than other social media platforms, with CPM as high as 10$.

Influencer marketing is less annoying and yields good returns since they’ve created their niche audience who are faithful to them 😇 At least for the time being 😟 until they get bored 😐 the influencer runs out of content 😢 or people find someone more interesting 🤩 But it seems relatively cheap and more effective.

So which data would you collect? It’s very subjective but, I think these could be useful

  1. Follower count
  2. Following count
  3. # of views on a post
  4. Likes on a post
  5. Comments on a post
  6. Shares of a post
  7. Hearts received (not literally)
  8. Delta counts per day of the above stats!
  9. Posts liked by a user
  10. Backing up user posts (Downloading)

To collect data from TikTok, they provide an API for content management and marketing. But we want to gather other user’s data, which they don’t allow 🙁

TikTok has a web app 🤔 So we turn to scrape ⛏️💪 !!!!!!!!!!

This library provides an API to scrape TikTok data from their web app using Selenium or Playwright Web Automation.

https://github.com/davidteather/TikTok-Api

TikTok realizes this and blocks your IP if you try to scrape data.
So what do we do?

Good Luck, I'm Behind 7 Proxies | Know Your Meme

Just use a Proxy Server !

Let’s outline the steps –

  1. Collect a list of TikTok handles you want to scrape.
    1. It can be a list of suggested accounts on TikTok from a seed account.
    2. It can be a list of Tiktok handles that users registered on your website.
    3. It can be a list of handles you’re managing or want to monitor (or spy ? 😈 ).
  2. Collect a list of stats required.
  3. If storing in a database, design a schema.
  4. Set up a script to loop over your list of users.
  5. Create a proxy server / Buy a service
    1. Buy a service ? (https://www.techradar.com/in/best/proxies) OR
    2. To create your own proxy server-
      1. Subscribe to ADEO Anonymous HTTPS Proxy (https://aws.amazon.com/marketplace/pp/prodview-lo2helasbjorq)
      2. Create an AWS EC2 instance
      3. Attach an elastic IP
      4. That’s it. This elastic IP is your proxy server IP!
  6. Create a headless browser instance using this proxy IP (The library has inbuilt functions for this)
  7. Call the profile details functions from the library. 
  8. Parse each JSON record and store it as a row in a CSV
  9. Import into your database, if applicable.
  10. Run static DB queries to populate the delta counts.

It roughly looks like this-

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
import requests
from TikTokApi import TikTokApi
import json
 
import utilityFunctions #custom made helper functions for eg Json key value to csv data
import time
 
 
 
class Tiktok( utilityFunctions):
 
    def __init__(self,args):
        #initialize global vars
         
        utilityFunctions.__init__(self, args)      
         
        #Put your configurations in a JSON file
        self.data_call = self.extractor_config['data']['call'][0]
         
        #proxy ip for avoiding reCaptcha after too many API Calls, in this case I've setup a proxy server on EC2 that rotates IPs
        self.proxy = self._chainedGet(self.data_call,"proxy_ip")
         
        #api response data file dump path
        self.local_tmp_file = self._chainedGet(self.data_call,"local_tmp_file")
         
        #response file content type
        self.content_type = self._chainedGet(self.data_call,"content_type")
         
        #Delay in seconds before each API call
        self.delay = self._chainedGet(self.data_call,"scrape_delay")
 
        #List of static tiktok hanldes to be scrapped, if you don't use API or want to manually add tiktok handles
        self.static_tiktok_handles = self._chainedGet(self.data_call,"static_tiktok_handles")
         
        if self.delay is None:
            self.delay = 5
         
        #user listing API/ this will give list of users to get the TikTok handles for
        self.user_list_api_url = self._chainedGet(self.data_call,"user_list_api_url")
         
        if self.proxy is not None:
            self.api = TikTokApi.get_instance(proxy=self.proxy)
        else:
            self.api = TikTokApi.get_instance()
         
        self.userList = self.getUserList()
 
        if self.static_tiktok_handles is not None and len(self.static_tiktok_handles)>0:
            self.userList = self.static_tiktok_handles + self.userList
 
    def getTikTokUserLikeList(self,tiktokUsername):
        #To scrape and assemble a JSON of posts Liked by a tiktok handle
        try:
            self.logger.info("Fetching User Likes for : "+tiktokUsername)          
            try:
                likeListDataResponse = self.api.userLikedbyUsername(tiktokUsername,count=2000, language='en')
            except Exception as e:
                self.logger.exception("Cannot fetch user "+tiktokUsername+": "+str(e))
                return
            self.logger.info("Delaying API Call for "+str(self.delay)+" seconds")
            time.sleep(self.delay)  
            if likeListDataResponse is not None:
                likeListData=[]
                 
                for item in likeListDataResponse:
                    item["uniqueId"]=tiktokUsername
                    likeListData.append(item)
 
            return likeListData
        except Exception as e:
            self.logger.exception("Error in getTikTokUserLikeList: "+str(e))
 
    def getTikTokVideoData(self,transformerObj):
        #To loop over accounts and assemble all users posts details in a list of json objects
        try:
            local_tmp_file = self.local_tmp_file
            content_type = self.content_type
 
            if local_tmp_file is None:
                local_tmp_file = "/tmp/VideoData.json"
 
            if content_type is None:
                content_type = "json"
 
            self.transformerObj = transformerObj
             
            userList = self.userList
 
            TikTokData = []
 
            for idx,username in enumerate(userList): 
                self.logger.info("Total  Users: "+str(len(userList)))
                self.logger.info("Processing user no "+str(idx+1)+"  Username: "+username)
                try:
                    TikTokUsername = self.getTikTokHandle(username)[0]
                    Id = self.getTikTokHandle(username)[1]
                    if TikTokUsername is None and username in self.static_tiktok_handles:
                        TikTokUsername = username
                
                    if TikTokUsername is not None:
                        self.logger.info("TikTok handle: "+TikTokUsername)
                        self.logger.info("Delaying API Call for "+str(self.delay)+" seconds")
                        time.sleep(self.delay)   
                        TikTokUserProfileData = self.getTikTokProfileVideos(TikTokUsername)
                                 
                        TikTokData.extend(TikTokUserProfileData)
                except Exception as e:
                    self.logger.exception("Error in getTikTokVideoData: "+str(e))
             
            data_count = len(TikTokData)
 
            with open(local_tmp_file, 'w', encoding='utf8') as json_file:
                json.dump(TikTokData, json_file, ensure_ascii=False)
             
            data_config = {
            "content_type": content_type
            }
             
            aFileContents = self.extractFileData(data_config, local_tmp_file)
 
            rTransformer = self.transformerObj.transform_process( aFileContents )
 
            self.pagination["total"] += data_count
 
            if( rTransformer != True ):
                self.logger.exception("Transformation Error")
                raise Exception("Transformation Error")
 
            return True
        except Exception as e:
            self.logger.exception("Error in getTikTokVideoData :"+str(e))
         
 
    def getTikTokUserData(self,transformerObj):
        #To loop over accounts and assemble all users profile details in a list of json objects
        try:
            local_tmp_file = self.local_tmp_file
            content_type = self.content_type
 
            if local_tmp_file is None:
                local_tmp_file = "/tmp/UserData.json"
 
            if content_type is None:
                content_type = "json"
 
            self.transformerObj = transformerObj
             
            userList = self.userList
             
 
            TikTokData = []
            for idx,username in enumerate(userList): 
                self.logger.info("Total  Users: "+str(len(userList)))
                self.logger.info("Processing user no "+str(idx+1)+"  Username: "+username)
                try:
                    TikTokUsername = self.getTikTokHandle(username)[0
                    Id = self.getTikTokHandle(username)[1]           
                    if TikTokUsername is None and username in self.static_tiktok_handles:
                        TikTokUsername = username
 
                    if TikTokUsername is not None:
                        self.logger.info("TikTok handle: "+TikTokUsername)
                        self.logger.info("Delaying API Call for "+str(self.delay)+" seconds")
                        time.sleep(self.delay)   
                        TikTokUserProfileData = self.getTikTokProfile(TikTokUsername)
                             
                        TikTokData.extend(TikTokUserProfileData)
                except Exception as e:
                    self.logger.exception("Error in getTikTokUserData: "+str(e))
 
            data_count = len(TikTokData)
 
            with open(local_tmp_file, 'w', encoding='utf8') as json_file:
                json.dump(TikTokData, json_file, ensure_ascii=False)
             
            data_config = {
            "content_type": "json"
            }
 
            aFileContents = self.extractFileData(data_config, local_tmp_file)
 
            self.pagination["total"] += data_count
            rTransformer = self.transformerObj.transform_process( aFileContents )
 
            if( rTransformer != True ):
                self.logger.exception("Transformation Error")
                raise Exception("Transformation Error")
 
            return True
        except Exception as e:
            self.logger.exception("Error in getTikTokUserData :"+str(e))
         
    def getTikTokUserLikeListData(self,transformerObj):
        #To loop over accounts and assemble all users liked tiktok posts details in a list of json objects
        try:
             
            local_tmp_file = self.local_tmp_file
            content_type = self.content_type
             
 
            if local_tmp_file is None:
                local_tmp_file = "/tmp/UserLikeListData.json"
 
            if content_type is None:
                content_type = "json"
 
            self.transformerObj = transformerObj
             
            userList = self.userList
 
            TikTokData = []
            for idx,username in enumerate(userList): 
                self.logger.info("Total  Users: "+str(len(userList)))
                self.logger.info("Processing user no "+str(idx+1)+"  Username: "+username)
                try:
                    TikTokUsername = self.getTikTokHandle(username)[0]
                     
                    if TikTokUsername is None and username in self.static_tiktok_handles:
                        TikTokUsername = username
                
                    if TikTokUsername is not None:
                        self.logger.info("TikTok handle: "+TikTokUsername)
                        self.logger.info("Delaying API Call for "+str(self.delay)+" seconds")
                        time.sleep(self.delay)   
                        TikTokUserProfileLikeListData = self.getTikTokUserLikeList(TikTokUsername)
                        if TikTokUserProfileLikeListData is not None and len(TikTokUserProfileLikeListData)>0:
                            TikTokData.extend(TikTokUserProfileLikeListData)
                except Exception as e:
                    self.logger.exception("Error in getTikTokUserData: "+str(e))
 
            data_count = len(TikTokData)
 
            with open(local_tmp_file, 'w', encoding='utf8') as json_file:
                json.dump(TikTokData, json_file, ensure_ascii=False)
             
            data_config = {
            "content_type": "json"
            }
 
            aFileContents = self.extractFileData(data_config, local_tmp_file)
 
            self.pagination["total"] += data_count
            rTransformer = self.transformerObj.transform_process( aFileContents )
 
            if( rTransformer != True ):
                self.logger.exception("Transformation Error")
                raise Exception("Transformation Error")
 
            return True
        except Exception as e:
            self.logger.exception("Error in getTikTokUserData :"+str(e))
 
    def getUserList(self):
        #This will call an API to get the list of users
        if self.user_list_api_url is None:
            self.user_list_api_url = "<YOU API URL that return USER>"
         
        UserListResponse = requests.get(self.user_list_api_url, params="", headers="",timeout=5)
        UserListResponseJson = json.loads(UserListResponse.text)
        userList = []
        for user in UserListResponseJson["data"]["data"]:
            userList.append(user["user"]["username"])
        return userList
 
    def getTikTokHandle(self,username):
        #This will call another API to get the tiktok handle for that user
        if username is None:
            self.logger.info("No Tiktok handle for user")
            return
        tiktok_handle = None
 
        try:
            UserProfileAPIURL = "YOUR API URL/get-user-details-endpoint/"+username
            UserProfileResponse = requests.get(UserProfileAPIURL, params="", headers="",timeout=5)
            UserProfileResponseJson = json.loads(UserProfileResponse.text)
             
            for social_network in UserProfileResponseJson["data"]["social_networks"]:       
                tiktok_handle = social_network["tiktok_handle"]
                         
            return [tiktok_handle]
        except Exception as e:
            self.logger.exception("Error in getTikTokHandle: "+str(e))
 
 
    def getTikTokProfileVideos(self,tiktokUsername):
        #To get the stats for tiktok posts / videos
        try:
            self.logger.info("Fetching Posts for user: "+tiktokUsername)
            try:
                profileGenerator = self.api.getUserPager(tiktokUsername, cursor=0)
            except Exception as e:
                self.logger.exception("Cannot fetch user "+tiktokUsername+": "+str(e))
                return
            tikTokList = []
            count = 0
            for item in profileGenerator:
                if item is not None:       
                    self.logger.info("Delaying API Call for "+str(self.delay)+" seconds")
                    time.sleep(self.delay)   
                    tikTokList.extend(item)
                    count = count + len(item)
                self.logger.info("Posts fetched:"+str(count))
            return tikTokList
        except Exception as e:
            self.logger.exception("Error in getTikTokProfileVideos: "+str(e))
 
    def getTikTokProfile(self,tiktokUsername):
        #To scrape and assemble a JSON of TikTok Profile Data of a tiktok handle
        try:
            self.logger.info("Fetching User Profile for : "+tiktokUsername)
            try:
                profileDataResponse = self.api.getUser(tiktokUsername, cursor=0)
            except Exception as e:
                self.logger.exception("Cannot fetch user "+tiktokUsername+": "+str(e))
                return
            self.logger.info("Delaying API Call for "+str(self.delay)+" seconds")
            time.sleep(self.delay)   
            profileData = profileDataResponse["userInfo"]
             
            return [profileData]
        except Exception as e:
            self.logger.exception("Error in getTikTokProfile: "+str(e))

If you’re going to store it in a database, you can create a schema like below-

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/*Table for TikTok UserData*/
create table IF NOT EXISTS orgUserData(
    org_id bigint(20) NOT NULL,
    userId bigint(20) NOT NULL,
    uniqueId varchar(255) NOT NULL,
    nickname varchar(255) DEFAULT NULL,
    userCreatedDate datetime DEFAULT NULL,
    avatarThumb text DEFAULT NULL,
    secUid varchar(255) DEFAULT NULL,
    privateAccount tinyint(1) DEFAULT '0',
    secret tinyint(1) DEFAULT '0',
    unique (org_id)
);
 
/*Table for TikTok UserStats*/
create table IF NOT EXISTS orgUserStats(
    userId bigint(20) NOT NULL,
    followerCount bigint(20) DEFAULT NULL,
    followingCount bigint(20) DEFAULT NULL,
    heartCount bigint(20) DEFAULT NULL,
    videoCount bigint(20) DEFAULT NULL,
    diggCount bigint(20) DEFAULT NULL,
    unique(userId)
);
 
 
/*Table for TikTok Posts meta Info*/
create table IF NOT EXISTS orgContentData(
    contentId bigint(20) NOT NULL,
    contentTitle varchar(255) DEFAULT NULL,
    contentDescription text,
    contentCreatedDate datetime DEFAULT NULL,
    contentCover text DEFAULT NULL,
    contentOriginCover text DEFAULT NULL,
    contentType varchar(10) DEFAULT 'video',
    PRIMARY KEY (contentId)
);
 
/*Table for TikTok Posts & User Data Mapping with Content Stats*/
CREATE TABLE IF NOT EXISTS orgUserContentUploadData (
    userId bigint(20) NOT NULL,
    contentId bigint(20) NOT NULL,
    diggCount bigint(20) DEFAULT NULL,
    shareCount bigint(20) DEFAULT NULL,
    commentCount bigint(20) DEFAULT NULL,
    playCount bigint(20) DEFAULT NULL,
    PRIMARY KEY (contentId),
    CONSTRAINT orgUserContentUploadData_ibfk_1 FOREIGN KEY (contentId) REFERENCES orgContentData (contentId)
);
 
/*Table for TikTok Posts Stats*/
create table IF NOT EXISTS orgContentStats(
    contentId bigint(20) NOT NULL,
    diggCount bigint(20) DEFAULT NULL,
    shareCount bigint(20) DEFAULT NULL,
    commentCount bigint(20) DEFAULT NULL,
    playCount bigint(20) DEFAULT NULL,
    PRIMARY KEY (contentId),
    CONSTRAINT orgContentDataInStats_ibfk_1 FOREIGN KEY (contentId) REFERENCES orgContentData (contentId)
);
 
 
/*Table for TikTok User Likes*/
create table IF NOT EXISTS orgUserLikes(
    userId bigint(20) NOT NULL,
    contentId bigint(20) NOT NULL,
    UNIQUE KEY user_content_unqiue (userId, contentId)
);
 
/*For Delta counts*/
select
    distinct cd.userId,
    cd.contentId,
    tmp.diggCount - cd.diggCount as diggDiff,
    tmp.shareCount - cd.shareCount as diggDiff,
    tmp.commentCount - cd.commentCount as commentDiff,
    tmp.playCount - cd.playCount as playDiff
from
    orgContentDataTmp tmp
    join orgUserContentUploadData cd on cd.contentId = tmp.contentId;
 
/*Table to store stats delta per day*/
/*We use a different DB for this, can you guess ? */
CREATE TABLE orgUserContentUploadDataDeltas (
    userId int NOT NULL,
    contentId int NOT NULL,
    diggCount int DEFAULT NULL :: int,
    shareCount int DEFAULT NULL :: int,
    commentCount int DEFAULT NULL :: int,
    playCount int DEFAULT NULL :: int,
    date_time timestamp DEFAULT (now()) :: timestamptz(6)
);

In the End 🥁🥁🥁🥁   –

Now you can run an Analysis on this data!
Use it for machine learning, running campaigns with influencers,
monitoring account stats for optimizing engagement and lots more.

Want to get this setup ? with detailed reporting, analytic dashboards with useful KPIs, and tracking.
You can check out Catchmedia Inc (https://www.catchmedia.com/). We’ve got an awesome team 😎

In case you have any questions or suggestions for this post. Please leave comments.
Thanks for checking this out! 🙂