引言
在跨境电商、社交媒体运营以及数据采集的过程中,我们经常会遇到被平台拦截的问题。这是因为各大平台为了保护其正常业务,防止恶意攻击或数据滥用,会限制非人类用户的访问频率。如果我们使用多账号登录或通过自动化工具频繁访问平台,系统很容易识别出异常行为,进而将我们的账号或IP地址列入黑名单,导致访问受限或直接被封禁。这种情况不仅影响了业务的正常开展,还可能导致数据采集中断,甚至丢失重要信息。面对这一问题,目前最有效的解决方案之一是使用动态IP。动态IP的核心优势在于,每次访问时都可以切换到一个新的IP地址,从而降低被平台识别和拦截的概率。与传统的商业数据中心IP不同,动态IP中有一类特殊的动态住宅IP,它们来自真实的家庭网络,能够更好地模拟普通用户的上网行为。
通过使用动态住宅IP服务,我们可以在进行社交媒体数据采集时,大幅减少被平台拦截的风险。无论是跨境电商的竞品分析,还是社交媒体运营的数据监控,动态住宅IP都能为我们提供更加稳定和高效的访问环境。接下来,我们将详细介绍如何配置和使用动态住宅IP服务,并完成社交平台的数据采集工作。
准备
首先我们需要配置动态住宅IP,我问了一下AI,推荐了几家常见的服务商,意义了解之后,我发现亮数据平台正好在促销,性价比是几家中最高的,这次我们就来试用一下。只需要注册账号即可开始使用。登录以后会跳转到工作台,在这里点击获取代理。
之后要进行简单的配置,这里我们只需要填写名称就可以了,其他选项大家可以根据需求选择。
动态住宅代理的计费模式是按照流量计算,使用期间可以随时切换IP地址。
配置好以后建议安装一下,这样可以提高安全性。当然不安装也不会太大的影响,后面需要的时候还可以再安装。
这样就配置好了。在这个页面可以找到我们的主机地址、用户名和密码,旁边有一个样例程序,可以修改它称为我们的爬虫程序。
配置好就可以开始配置社交平台API接口了。在平台中注册为开发者就可以拿到自己的token,在后面的访问需要使用它才能正常接入。
- proxies = {
- 'http': 'http://brd-customer-hl_a0a48734-zone-residential proxy1:[email protected]:33335'
- 'https': 'https://brd-customer-hl_a0a48734-zone-residential proxy1:[email protected]:33335'
- }
我们本次的任务是抓取下图账号的所有帖子和每个帖子点赞数、转发数等指标。在开始之前,我们需要配置一下请求头参数,这个可以直接在网站中获得。在控制台中找到header和cookie的值复制出来就可以了。
找到之后把它们打包在一个类中方便后面使用。
- class CsxqTwitterKeywordSearch:
- def __init__(self,saveFileName,cookie_str):
- self.saveFileName = saveFileName
- self.searchCondition = None
- self.headers = {
- 'headers对应的参数'
- }
- self.cookies = self.cookie_str_to_dict(cookie_str)
接下来我们需要配置一下请求参数,这些需要通过cursor去网站获取。
- def get_params(self,cursor):
- if cursor == "":
- variables = {"rawQuery": self.searchCondition, "count": 20,"querySource": "typed_query", "product": "Latest"}
- params = {
- "variables": json.dumps(variables,separators=(",",":")),
- "features": ""
- }
- else:
- variables = {"rawQuery": self.searchCondition, "count": 20, "cursor": cursor, "querySource": "typed_query", "product": "Latest"}
- params = {
- "variables": json.dumps(variables,separators=(",",":")),
- "features": ""
- }
- return params
之后使用获取到的参数访问并获取元数据,这里需要注意元数据是一个json表单。这里url部分需要将中间替换为自己的token 才能使用。
- def get(self,cursor):
- self.headers["x-csrf-token"] = self.cookies['ct0']
- url = "https://x.com/i/api/graphql/6uoFezW1o4e-n-VI5vfksA/SearchTimeline"
- params = self.get_params(cursor)
- while True:
- try:
- response = requests.get(url,
- headers=self.headers,
- cookies=self.cookies,
- params=params,
- timeout=(3,10),
- proxies=proxies)
- if response.status_code == 429:
- time.sleep(60*20)
- if response.status_code == 200:
- data = response.json()
- return data
- except Exception as e:
- print("搜索接口发生错误:%s" % e)
最后我们需要将目标数据从获取到的元数据中提取出来。这里我们需要提取的是内容、时间、点赞、评论、转发、用户名、简介、粉丝量和关注量,由于元数据是json表单所以只需要简单转换为字典就可以轻松获取。
- def parse_data(self,entries):
- resultList = []
- def transTime(dd):
- GMT_FORMAT = '%a %b %d %H:%M:%S +0000 %Y'
- timeArray = datetime.datetime.strptime(dd, GMT_FORMAT)
- return timeArray.strftime("%Y-%m-%d %H:%M:%S")
-
- contentList = []
- for index, ent in enumerate(entries):
- try:
- entryId = ent.get('entryId', "")
-
- if 'tweet' in entryId:
- l_result = ent['content']['itemContent']['tweet_results']['result'] if ent['content'].get(
- 'itemContent') else None
- if l_result:
- contentList.append(l_result)
- elif "profile-conversation" in entryId:
- items = ent['content']['items']
- for i in items:
- l_result = i['item']['itemContent']['tweet_results']['result'] if i['item'].get(
- 'itemContent') else None
- if l_result:
- contentList.append(l_result)
- except:
- pass
- for l in contentList:
- try:
- result = l.get('tweet') if l.get('tweet') else l
- legacy = result['legacy']
- core = result['core']
- created_at = transTime(legacy.get('created_at'))
- full_text = legacy.get('full_text')
- note_tweet = result.get('note_tweet')
- favorite_count = legacy.get('favorite_count') # 点赞
- reply_count = legacy.get('reply_count') # 回复
- retweet_count = legacy.get('retweet_count', 0)
- quote_count = legacy.get('quote_count', 0)
- retweet_count = retweet_count + quote_count
- if note_tweet:
- try:
- full_text = note_tweet['note_tweet_results']['result']['text']
- except:
- pass
-
- u_legacy = core['user_results']['result']['legacy']
- hash_uname = u_legacy.get('screen_name')
- description = u_legacy['description']
- friends_count = u_legacy['friends_count']
- followers_count = u_legacy.get('followers_count')
-
- item = {"内容":full_text,"时间": created_at,"点赞":favorite_count,"评论":reply_count,"转发":retweet_count,"用户名": hash_uname,"简介": description,\
- "粉丝量":followers_count,"关注量":friends_count}
- print("数据->",item)
- resultList.append(item)
- except:
- pass
- self.save_data(resultList)
最后我们要将数据保存为一个本地csv文件。
- def save_data(self, resultList):
- if resultList:
- df = pd.DataFrame(resultList)
- if not os.path.exists(f'./{self.saveFileName}.csv'):
- df.to_csv(f'./{self.saveFileName}.csv', index=False, mode='a', sep=",", encoding="utf_8_sig")
- else:
- df.to_csv(f'./{self.saveFileName}.csv', index=False, mode='a', sep=",", encoding="utf_8_sig",
- header=False)
- self.resultList = []
- print("保存成功")
整个流程通过一个入口函数控制,在运行的同时打印一些状态信息。
- def run(self,word):
- cursor = ""
- page = 1
- while True:
- # if page > 2:
- # break
- print("正在爬取的页数:%s,cursor:%s"%(page,cursor))
- resqJson = self.get(cursor)
- if not resqJson:
- break
- cursor,entries = self.get_cursor(resqJson)
- if entries:
- self.parse_data(entries)
- page += 1
- else:
- break
- def main(self,fromDate,endDate):
-
- wordList = ["climate change"]
-
- start = 0
- for index,word in enumerate(wordList[start:],start):
- self.searchCondition = f"{word} lang:en until:{endDate} since:{fromDate}"
- print("搜索条件:",self.searchCondition)
- self.run(word)
-
通过一个主函数执行整个程序,这里需要用户粘贴自己的cookie。
- if __name__ == '__main__':
- cookie_str = '改成你自己的cookies'
- fromDate = "2024-08-10"
- endDate = "2024-10-13"
- saveFileName= "Tim_Cook"
- ctks = CsxqTwitterKeywordSearch(saveFileName,cookie_str)
- ctks.main(fromDate,endDate)
运行一下就可以获得结果,可以看到程序运行正常。



评论记录:
回复评论: