MENU

几个爬虫小栗子

2020 年 01 月 19 日 • 派森编程

年前水一篇,最近还是在看python,基础看完之后就去看数据库了,数据库之后就是web了,Django&Flask之类的,先是html入门,这个看完了,css看到一半看不下去了,js一点没看,所以我决定先跳过这一块,暂时去看看别的,再下来就是爬虫方面的东西了,个人感觉这个东西还是蛮有意思的,提起了我的兴趣,看了之后发现没点前端web方面的基础玩起来会很吃力,之后也自己试着写了很多,像是什么登陆某些网站、爬取某些网站的数据,当然下面的几个小栗子也都是入门级的,也都是基于python3,先来尝试登陆某个网站获取一些登录后才能拿到的数据。

登陆GitHub

具体要做什么,就是能登陆到GitHub,登录后访问电子邮件设定页面,在页面源码里获取到当前登录的用户名、绑定的邮箱地址,还有就是访问安全日志页面,获取第一页的信息,也就是账户的登录记录,包括登陆状态、国家、IP,日期,大概就是这样,这些东西都是登陆之后才能获取到的信息,所以先想办法登陆到github

先来分析一下页面,先点开浏览器的F12,打开持续日志,打开目标网址,也就是https://github.com/login,页面加载完成之后在开发者模式里面点开这个,也就是loginresponse,一会需要看这里面的源码

然后输入用户名密码点登陆吧,登陆成功之后就可以去看开发者工具了,找一个URIsessionPOST请求,也就是这个,这个就是登陆接口,看一下他都提交了哪些信息,

commit: Sign in
utf8: ✓
authenticity_token: 3/YrvJJIf3pYYs8pjn8wTeww/ElnHvLLt6MNLS50CEPeLOcgjDg2OdExK4q8nzFpqK2KRbq5YLBftXXeRWE2OQ==
ga_id: 123228193.1576807919
login: xxx
password: xxxx
webauthn-support: supported
webauthn-iuvpaa-support: unsupported
required_field_4988: 
timestamp: 1578547909624
timestamp_secret: 58ecf2f5b220668cedd903552595e7efd24455840dad422a1ed05e6562b23d94

分析一下提交的数据,前两个值都是固定的,一会发送POST请求的时候写死就可以了。

authenticity_token,这个值是login页面会返回给我们,一会把这个值取出来带上

ga_id值是存在cookie里面的,一会也把他取出来带上。

login为用户名或邮箱地址,自行输入即可

password为登陆密码,自己输入即可

required_field_4988这个key并没有值,这个key也是动态的,也在login页面源码里

timestamp是时间戳,这个时间戳也是页面返回给我们的,一会把他取出来带上

timestamp_secret也是login页面返回给我们,一会把他取出来带上,

所以我们需要把这些数据在页面源码或cookies里取出来,发送登陆请求的时候带上就行了,当然用户名密码除外,手动输入即可,

authenticity_token

ga_id

required_field_4988

timestamp

timestamp_secret

搞清楚这些之后就可以写代码了撒,用两个模块就够了,分别是re/requestsrequests用来帮我们发送请求,re帮我们拿到我们想要的东西就够了,所以代码如下,requests自行安装撒。

import re
import requests


class Login_github(object):
    def __init__(self):
        # 初始化session对象
        self.request_session = requests.session()
        # 定义一个空列表,用于存在登陆记录
        self.login_record = []
        # 默认请求头
        self.header = {
            "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/79.0.3945.88 Safari/537.36",
        }
        # github首页
        self.index_url = 'https://github.com'
        # 请求登陆页面
        self.response = self.request_session.get(url='%s' % self.index_url + '/login', headers=self.header)

    # 获取ga_id方法
    def ga_id(self):
        cookie = self.response.cookies['_octo']
        ga_id_search = re.compile(r'(\d{2,10}\.\d{2,10})')
        return re.search(ga_id_search, cookie).group(1)

    # 获取authenticity_token值方法
    def authenticity_token(self):
        authenticity_token_search = re.compile(r'authenticity_token.\svalue="(.*?)"')
        return re.search(authenticity_token_search, self.response.text).group(1)

    # 获取时间戳方法
    def timestamp(self):
        timestamp_search = re.compile(r'name="timestamp"\svalue="(\d{13})"')
        return re.search(timestamp_search, self.response.text).group(1)

    # 获取required_field方法
    def required_field(self):
        required_field_search = re.compile(r'name="(\w+)?"\shidden')
        return re.search(required_field_search, self.response.text).group(1)

    # 获取timestamp_secret方法
    def timestamp_secret(self):
        timestamp_secret_search = re.compile(r'timestamp_secret.\svalue="(.*?)"')
        return re.search(timestamp_secret_search, self.response.text).group(1)

    # 登陆方法
    def Login(self):
        username = input('用户名或邮箱地址: ')
        password = input('密码: ')
        # 定义
        post_data = {
            "commit": "Sign in",
            "utf8": "✓",
            "ga_id": self.ga_id(),
            "authenticity_token": self.authenticity_token(),
            "login": username,
            "password": password,
            "webauthn-support": "supported",
            "webauthn-iuvpaa-support": "unsupported",
            "timestamp": self.timestamp(),
            self.required_field(): '',
            "timestamp_secret": self.timestamp_secret()
        }
        # 发送登陆POST请求
        self.request_session.post(url='%s' % self.index_url + '/session', headers=self.header, data=post_data)

    # 查询登陆记录,一页最多50条记录
    def login_session(self):
        # 正则匹配
        login_search = re.compile(r'<span class="audit-type">\s+.+\s.+\s+('
                                  r'\w+\.\w+)\s+.+\s.+\s.+\s+.+\s.+\s+.+\s+.+\s+.+?>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,'
                                  r'3}).+\s+.+\s+(\w+.+)\s\s+.+\s.+">?(.*)<.+')
        # 请求安全日志页面
        login_info = re.findall(login_search, self.request_session.get(url='%s' % self.index_url +'/settings/security-log',headers=self.header).text)
        for login in range(len(login_info)):
            info_dict = {
                'status': login_info[login][0],
                'ip-add': login_info[login][1],
                'country': login_info[login][2],
                'date': login_info[login][3]
            }
            self.login_record.append(info_dict)

    # 获取用户名、邮箱、最近登陆国家、日期、IP地址信息
    def run(self):
        # 尝试登陆
        try:
            self.Login()
        except Exception as e:
            print(e)
        # 访问设置页面
        response = self.request_session.get(url='%s' % self.index_url + '/settings/emails', headers=self.header).text
        try:
            # 正则匹配用户名
            user_name_search = re.compile(r'strong\sclass="css-truncate-target">(.*?)<')
            # 正则匹配邮箱
            email_search = re.compile(r'option\sselected="selected"\svalue="\d+">(.+?)<')
            # 打印用户名
            print('你的用户名为:%s' % re.search(user_name_search, response).group(1))
            # 打印邮箱
            print('你的邮箱为: %s' % re.search(email_search, response).group(1))
            try:
                self.login_session()
            except Exception as e:
                print(e)
            print('登陆记录如下')
            for i in range(len(self.login_record)):
                print(self.login_record[i])
        except:
            print('没有检索到用户名/邮箱地址,请确认你输入的用户名密码是否正确,也有可能出现了验证码,看看你的邮箱撒')


if __name__ == '__main__':
    github = Login_github()
    github.run()

登陆成功的输出如下,输出用户名、账户绑定的邮箱,和安全页面第一页的登陆记录,一页是50条,图片我就是截了一部分,总之效果就是这样

当然也有登陆失败的时候,登陆失败输出如下,

上图登陆失败的原因就是出现了验证码,没办法继续了,之前可能就是太频繁了,还有种可能就是用户名密码错误,如果你的邮箱没有收到验证码就说明你的用户名密码有问题,还有可能就是网络超时,网络超时的话会抛出ConnectionError的错误,还有时候会抛出证书错误,这个不知道是为什么,估计也是网络造成的,大概就是这样,这个小脚本就算完事了,下面来一个稍微复杂的。

爬取豆瓣top250

这个比上面的复杂一些,用到了re/requests/threading/lxml/pymongopymongo其实可以不用,我只是将爬取到的数据存到了mongodb中,在不用mongodb的情况不用安装,然后这个脚本就可以用了,如果不了解xpath/lxml可以去了解一下,不是特别复杂,前提是你必须有点前端基础,否则看起来也很吃力,lxml是要额外安装的,自行使用pip命令安装一下吧。

要爬的目标页面是这个,这个是第一页,一会需要生成URL,暂时还没用到selenium ,所以想翻页就只能自行生成URL了,要获取到的内容分别是电影名、导演、评分、评价数、简介,首先是mongo_db.py,根据自己的情况自行调整吧,目前就是进行了简单的插入操作,并没有做别的操作,当然不准备入库的话也不需要这个。

from pymongo import MongoClient


class Mongo_client(object):
    def __init__(self):
        try:
            client = MongoClient(host='192.168.1.200', port=37017)
            client.admin.authenticate('admin', 'Sowhat?')
            my_db = client['douban']
            self.collection = my_db['collection_top250']
        except Exception as e:
            print(e)

    def insert_db(self, item):
        self.collection.insert_one(item)


insert_data = Mongo_client()

然后是爬虫文件,

import re
import requests
import threading
from lxml import etree
from tools.mongo_db import insert_data
from concurrent.futures import ThreadPoolExecutor


class HandleDoubanMovieTop250(object):
    def __init__(self):
        # 定义请求头
        self.header = {
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,'
                      'application/signed-exchange;v=b3;q=0.9',
            'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                          'Chrome/79.0.3945.88 Safari/537.36 ',
            'Host': 'movie.douban.com'
        }
        # 一个空列表,用于存放构建后的URL
        self.page_url = []
        # 定义线程锁
        self.lock = threading.Lock()
        # 定义正则匹配
        self.movie_re = re.compile(r'\s+')

    # 构建页面地址
    def handle_page_url(self):
        page = 0
        while page <= 25:
            self.page_url.append('https://movie.douban.com/top250?start=%d' % page)
            page += 25

    # 发送get请求,返回页面信息
    def handle_request(self, page_url):
        # 将页面实例化并返回
        return etree.HTML(requests.get(url=page_url, headers=self.header).text)

    # 开始处理数据
    def handle_page_detail(self, page_url):
        # 调用实例化函数
        response = self.handle_request(page_url)
        # 开始处理数据
        all_div = response.xpath('//div[@class="info"]')
        for html in all_div:
            movie_dict = {
                # 电影名
                'movie_name': self.movie_re.sub('', ''.join(html.xpath('./div/a/span/text()'))),
                # 导演
                'actors_information': self.movie_re.sub('', "".join(
                    html.xpath(".//div[@class='bd']/p[@class='']/text()"))),
                # 评分
                'score': html.xpath('.//span[@class="rating_num"]/text()')[0],
                # 评价数
                'evaluate': html.xpath('.//div[@class="star"]/span[last()]/text()')[0],
                # 简介
                'describe': html.xpath('.//span[@class="inq"]/text()')[0],
            }
            # 入库,如果不入库直接打印movie_dict也可以
            with self.lock:
                insert_data.insert_db(movie_dict)

    def run(self):
        # 生成要爬取的页面
        self.handle_page_url()
        # 定义线程池
        executor = ThreadPoolExecutor(max_workers=10)
        # 开跑
        for url in self.page_url:
            executor.submit(self.handle_page_detail, url)
        # 关闭线程池
        executor.shutdown()


if __name__ == '__main__':
    # 实例化
    top250 = HandleDoubanMovieTop250()
    # 开跑
    top250.run()

然后开跑,跑完之后去看一哈mongodb

截了一部分,总条数250条就对了,

由于用了多线程,所以数据插入的顺序是乱的,像是入库的第一条萤火虫之墓,他是排在151,本该是第一位的肖申克的救赎在13条,想避免这个问题就不要用多线程了,大概就是这样,下面用selenium 来搞点事情。

爬取51job职位

这个主要就是用到了etree/selenium/pymongo,当然不入库一样可以不用pymongo,目标网址就是这里,也就是前程无忧的职位搜索页面,全国范围的,一会的操作就是用代码去调用浏览器,在这之前需要先去下载浏览器驱动,目前只支持Phantomjs/Chrome/Firefox,我这里用的Chrome的驱动,在这里可以找到,看一下自己浏览器版本去自行下载吧,下载完成之后加入到环境变量,在任意位置打开命令行运行chromedriver --versioin能看到如下信息就算成功了。

然后自行安装一下selenium这个包就可以了,大概要做的就是调用浏览器打开目标页面,判断搜索框有没有出现,如果出现了输入要搜索的职位信息,输入之后点击搜索按钮获取数据,接下来就是判断有没有下一页了,如果有则点击下一页进行翻页,如果没有就退出,这个我打算扔到centos7上面去跑了,所以在centos也要安装Chrome和他的驱动才行,先把这件事情搞了。

# 安装浏览器
yum -y install https://dl.google.com/linux/direct/google-chrome-stable_current_x86_64.rpm
# 下载驱动
wget http://chromedriver.storage.googleapis.com/79.0.3945.36/chromedriver_linux64.zip
unzip chromedriver_linux64.zip -d /usr/local/bin/
chmod +x /usr/local/bin/chromedriver
# 安装需要的包
pip3 install lxml pymongo selenium -i https://pypi.tuna.tsinghua.edu.cn/simple

代码如下

from lxml import etree
from selenium import webdriver
from tools.mongo_db import insert_data
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC


class Search_51job(object):
    def __init__(self):
        chrome_options = Options()
        # root用户需要加上这个
        chrome_options.add_argument('--no-sandbox')
        # 关闭GPU加速
        chrome_options.add_argument('--disable-gpu')
        # 设置无头浏览器
        chrome_options.add_argument('--headless')
        # 设置浏览器
        self.driver = webdriver.Chrome(chrome_options=chrome_options)

    # 页码处理方法
    def handle_job(self):
        # 定义请求地址
        self.driver.get('https://search.51job.com/list/000000,000000,0000,00,9,99,%2520,2,1.html')
        # 判断kwdselectid元素是否出现,这个就是搜索框
        if WebDriverWait(self.driver, 5, 0.5).until(EC.presence_of_element_located((By.ID, 'kwdselectid'))):
            # 如果搜索框出现了就输入要搜索的职位
            input_keyword = input('请输入要查找的岗位信息: ')
            # 向搜索框输入要搜索的职位
            self.driver.find_element_by_id('kwdselectid').send_keys(input_keyword)
            # 点击搜索按钮
            self.driver.find_element_by_class_name('p_but').click()
            # 开始处理数据,先判断搜索结果是否出现
            if WebDriverWait(self.driver, 5, 0.5).until(EC.presence_of_element_located((By.ID, 'resultList'))):
                while True:
                    # 如果出现则将当前页面交给网页处理方法
                    self.handle_parse(self.driver.page_source)
                    # 再进行判断这个xpath的值是不是下一页
                    if self.driver.find_element_by_xpath('//li[@class="bk"][2]/a').text == '下一页':
                        # 如果是则点击这个位置进行翻页
                        self.driver.find_element_by_xpath('//li[@class="bk"][2]/a').click()
                    else:
                        # 如果没有出现下一页则跳出循环
                        break
            # 关闭浏览器
            self.driver.quit()

    # 网页处理方法
    def handle_parse(self, page_source):
        html_51job = etree.HTML(page_source)
        all_div = html_51job.xpath("//*[@id='resultList']//div[@class='el']")
        for item in all_div:
            info_dict = {'position': item.xpath('./p/span/a/@title')[0],
                         'company': item.xpath(".//span[@class='t2']/a/@title")[0],
                         'place': item.xpath("./span[@class='t3']/text()")[0]}
            # 月薪这里有的没数据,这里进行了异常处理,如果没数据则等于无数据
            try:
                info_dict['salary'] = item.xpath("./span[@class='t4']/text()")[0]
            except IndexError:
                info_dict['salary'] = '无数据'
            info_dict['issue_date'] = item.xpath("./span[@class='t5']/text()")[0]
            # 打印获取到的信息
            print(info_dict)
            # 入库
            insert_data.insert_db(info_dict)


if __name__ == '__main__':
    search_51job = Search_51job()
    search_51job.handle_job()

服务器目录结果如下,

[root@rj-bai ~/51job]# tree .
.
├── 51_selenium.py
└── tools
    └── mongo_db.py

1 directory, 2 files

mongo_db.py内容如下,

[root@rj-bai ~/51job]# cat tools/mongo_db.py 
from pymongo import MongoClient


class Mongo_client(object):
    def __init__(self):
        try:
            client = MongoClient(host='192.168.1.200', port=37017)
            client.admin.authenticate('admin', 'Sowhat?')
            my_db = client['51job']
            self.collection = my_db['collection_51job']
        except Exception as e:
            print(e)

    def insert_db(self, item):
        self.collection.insert_one(item)


insert_data = Mongo_client()

这次我把要入库的信息也打印到屏幕上了,准备开跑,

[root@rj-bai ~/51job]# python3 51_selenium.py 
请输入要查找的岗位信息: 运维

跑了几秒钟就停掉了,获取到了五页数据,其实这个不用selenium就可以搞定,用requests/lxml/re就够了,总页数在页面源码里都可以看到,取到总页数后写个方法构造URL就可以了,总之能不用selenium就不用selenium,比较占用资源,毕竟要调用浏览器的,规则就是你想要的东西都在页面源码里你就不需要selenium,如果遇到动态页面就需要用这个了,譬如说P站。

下载P站图片

这个的话暂时就用selenium去搞了,搞个最简单的,当前我并没有登陆,所以在未登录状态去访问某个页面,把那一页的缩略图拿下来就行了,暂时不搞翻页,我是个三蹦子玩家,所以搞点三蹦子图片下来,目标页面这里

目前要做的事情就是把通过selenium获取到全部缩略图的地址,然后把缩略图下载到本地,因为P站比较特殊,他的很多东西都是在网页源代码里看不到的,都是动态填充的,先不考虑直接去调用他的接口,这个接口调用之后返回的就是我们想要的数据,之后会提到这个,就先用selenium+requests搞定这个,selenium帮我们拿到想要的数据,requests帮我们去下载就可以了,大概就是这样,现在用requests去访问一下目标地址,看一下源码,

import requests

print(requests.get(url='https://www.pixiv.net/tags/%E5%B4%A9%E5%9D%8F3/illustrations').text)

可以看到是找不到任何img标签的,所以你用requests+xpath去取数据,不好意思取不到,所以这时候用selenium去获取数据了,代码如下,这个写的比较随便,

import os
import time
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# 请求地址
index_url = 'https://www.pixiv.net/tags/%E5%B4%A9%E5%9D%8F3/illustrations'

pixiv = webdriver.Chrome()
# 请求目标地址
pixiv.get(url=index_url)
# 定义请求头,否则下载图片403
header = {
    "referer": index_url
}

# 开始判断这个元素有没有出现
try:
    if WebDriverWait(pixiv, 10, 3).until(EC.presence_of_element_located((By.XPATH, "//section[@class='sc-LzMkr jkOmhS']"))):
        # 开始向下滚动页面,滚动3次,每次1024像素
        for i in range(3):
            pixiv.execute_script('window.scrollBy(0,1024)')
            # 等待五秒,等待页面加载
            time.sleep(5)
        # 通过xpath来匹配图片URL
        image_list = pixiv.find_elements_by_xpath("//div[@class='sc-fzXfPL fRnGwV']/img")
        print('当前获取图片的总数为: %s ' % len(image_list))
        for item in image_list:
            # 缩略图URL
            thumbnail_url = item.get_attribute('src')
            print('当前下载的图片为: %s' % thumbnail_url)
            # 下载图片到本地
            with open('C:/Users/Administrator/Desktop/pixiv/%s' % os.path.basename(thumbnail_url), 'wb') as f:
                f.write(requests.get(headers=header, url=thumbnail_url).content)
except Exception as e:
    print(e)
finally:
    pixiv.quit()

代码就是这样,图片应该是64张,热门作品缩略图四张,一页搜索结果六十张,开跑

这样就把图片下载下来了撒,不过都是缩略图,这就是个小栗子,针对动态页面的,其实不太建议用这种方式去获取数据,按我目前的理解只要是动态页面一般都是ajax渲染数据的,包括P站,所以找到ajax请求地址去获取数据会比这个好很多,下面开始直接去请求ajax地址来获取数据。

正式开搞

做个简单点的,登录到P站,输入关键字之后去搜索,分析的全过程这里不多扯了,说重点,处于登陆状态,譬如我搜索三蹦子的时候页面发送了异步请求,请求地址如下,

https://www.pixiv.net/ajax/search/artworks/%E5%B4%A9%E5%9D%8F3?word=%E5%B4%A9%E5%9D%8F3&order=date_d&mode=all&p=1&s_mode=s_tag&type=all

解码看一下这是个什么地址,

from urllib.parse import unquote
print(unquote('https://www.pixiv.net/ajax/search/artworks/%E5%B4%A9%E5%9D%8F3?word=%E5%B4%A9%E5%9D%8F3&order=date_d&mode=all&p=1&s_mode=s_tag&type=all', 'utf-8'))

那两个崩坏3应该就是关键字,p=1就是页码,然后访问一下这个地址,看看返回的是啥子,

import requests
print(requests.get(url='https://www.pixiv.net/ajax/search/artworks/%E5%B4%A9%E5%9D%8F3?word=%E5%B4%A9%E5%9D%8F3&order=date_d&mode=all&p=1&s_mode=s_tag&type=all').text)

返回的是json数据,先复制到网页上解析一下看看都有啥,我直接贴需要的内容了,

这个就是第一页的搜索结果,这是第一页第一条记录的详细信息,截了一部分,一页一共是60条记录,data可以看到60,信息包括什么ID、标题、标签、userID、插画总数都在,重点是哪个illustId,这个就是详情页的ID,取到这个通过构造URL就可以去访问详情页了。

在我访问详情页的时候发现点问题,如果是单图的情况下图片的地址在页面源码里是可以看到的,这个没问题,如果是图册,点进去之后只能看到一张图片,需要点击查看全部才能看到全部的图片,而且就算点开了查看全部在页面源码里也看不到图片地址,我看了一下异步请求,果然是点击查看全部请求之后又有异步请求发出去了,也就是这个

那串数字就是illustId,他返回的也是json,这个详情页是有两张图,看一下这个json的视图,如下,

body里面两条数据,经过分析original就是原图地址了,所以,要做的事情就是通过代码先登录,当然也可以不登录,请求搜索接口,解析返回的json,取出illustId,也就是详情页ID,取出之后构造出详情页异步请求地址,然后再发请求到这个地址,解析返回的json取出original地址,也就是原图的地址,取到地址之后下载保存到本地就可以了。

至于页码逻辑暂时是这样写的,通过上面的方式获取到插画总数,总数除60强制进位,就得到页数了,但是有个问题,如果登录了就可以获取所有的数据,在不登陆的情况下只能查看前十页数据,就算是访问第11页的数据返回的也是第十页的,所以代码了进行了判断,在没有登录的情况下,如果页码大于10就重新赋值为10,如果不大于10则不进行任何操作,登录状态下获取全部数据。

登录这一块我并不是通过用户名密码去登陆的,而是通过cookies,我也试过用requests/selenium去登陆,但是问题很大,十有八九要求我进行验证,偶尔会登陆成功,搞得我头大,还是我某些地方写的有问题,最后换了一个比较靠谱的方式,那就是用cookies,这个cookies需要你自己获取一下,也很简单,就是在浏览器登陆之后F12,找到这个,

看请求头里面的cookie值,也就是这一串,

复制出来就可以了,然后代码如下,需要自行使用pip安装两个包,也就是colorama/requests,装完之后就可以运行了,注意线程不要开太多,超时时间自行调整吧,代码里设置的10

import os
import re
import sys
import math
import json
import requests
import threading
from colorama import Fore
from multiprocessing import Queue


# 获取图片地址类
class Get_Picture_Url(threading.Thread):
    def __init__(self, thread_name, page_queue, image_queue, search_name, cookies):
        super(Get_Picture_Url, self).__init__()
        # 线程名
        self.thread_name = thread_name
        # 页码队列
        self.page_queue = page_queue
        # 图片队列
        self.image_queue = image_queue
        # 搜索名字
        self.search_name = search_name
        # cookies
        self.cookies = cookies
        # 定义请求头
        self.header = {
            "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,"
                      "application/signed-exchange;v=b3;q=0.9",
            "accept-encoding": "gzip, deflate, br",
            "accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
            "cache-control": "no-cache",
            "pragma": "no-cache",
            "sec-fetch-mode": "navigate",
            "sec-fetch-site": "none",
            "sec-fetch-user": "?1",
            "upgrade-insecure-requests": "1",
            'referer': 'https://www.pixiv.net',
            "user-agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/79.0.3945.117 Safari/537.36",
        }

    # 构造URL
    def run(self):
        print(Fore.LIGHTBLUE_EX, '\n\t当前启动的处理页码的线程为:%s ' % self.thread_name)
        while not url_flag:
            try:
                # block默认为True,设置成False,如果队列为空抛出异常,empty,full
                page = self.page_queue.get(block=False)
                # 构造URL
                page_url = 'https://www.pixiv.net/ajax/search/artworks/%s?word=%s&order=date_d&mode=all&p=%d&s_mode' \
                           '=s_tag&type=all' % (self.search_name, self.search_name, page)
                print(Fore.LIGHTBLUE_EX, "\n\t当前构造的url为:%s" % page_url)
                # 调用获取图片地址方法
                self.Get_Original_Picture_Url(page_url)
            except:
                pass

    # 获取图片地址方法
    def Get_Original_Picture_Url(self, url):
        try:
            print(Fore.LIGHTBLUE_EX, '\n\t当前获取图片地址的线程为:%s ' % self.thread_name)
            # 请求搜索接口
            search_response = requests.get(url=url, headers=self.header, timeout=10, cookies=self.cookies).text
            # 解析返回的json取出illustId
            illustId_path = json.loads(search_response)['body']['illustManga']['data']
            for illustId in illustId_path:
                # 构造详情页地址发送请求并解析返回json
                details_json = requests.get('https://www.pixiv.net/ajax/illust/%s/pages' % illustId['illustId'],
                                            headers=self.header, timeout=10, cookies=self.cookies).text
                # 获取图片地址将地址添加到队列,因为会有图册出现,图片不止一张,只能循环了
                image_path = json.loads(details_json)['body']
                for image in range(len(image_path)):
                    # 将图片地址添加到图片队列
                    self.image_queue.put(image_path[image]['urls']['original'])
        except Exception as e:
            print(e)


# 图片下载类
class Download_Image(threading.Thread):
    def __init__(self, thread_name, image_queue, dir_name):
        super(Download_Image, self).__init__()
        self.thread_name = thread_name
        self.image_queue = image_queue
        # 保存图片的文件夹名称,就是你的搜索的关键字
        self.dir_name = dir_name
        # 保存位置就是脚本当前目录下的搜索名字
        self.path = os.path.dirname(os.path.abspath(__file__)) + '/' + self.dir_name
        # 定义请求头
        self.header = {
            'referer': 'https://www.pixiv.net',
            "user-agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/79.0.3945.117 Safari/537.36",
        }
        # 创建文件夹
        if not os.path.exists(self.path):
            os.mkdir(self.path)

    # 开始下载图片
    def run(self):
        print(Fore.LIGHTBLUE_EX, '\n\t当前启动下载图片的线程为: %s' % self.thread_name)
        while not image_flag:
            try:
                # 拉取图片队列中的数据
                image_url = self.image_queue.get(block=False)
                # 将图片保存到本地
                print(Fore.LIGHTBLUE_EX, '\n\t当前执行下载的线程为: %s ' % self.thread_name)
                with open(self.path + '/' + '%s' % os.path.basename(image_url), 'wb') as f:
                    print(Fore.LIGHTBLUE_EX, '\n\t当前下载的图片为: %s' % image_url)
                    f.write(requests.get(headers=self.header, url=image_url, timeout=10).content)
            except:
                pass


# 立flag
url_flag = False
image_flag = False


def run():
    #  输入要搜索的图片
    search_name = input('\n\t输入要搜索的图片: ')
    cookies = input('\n\t请输入你的cookies,不输入则使用默认cookies: ')
    # cookies字典
    cookies_dict = {}
    # 判断输入
    if len(search_name) == 0:
        print(Fore.LIGHTRED_EX, '\n\t请输入要搜索的内容')
        sys.exit(0)
    else:
        # 判断cookies是否合法
        if len(cookies) == 0:
            print(Fore.LIGHTGREEN_EX,'\n\t未输入cookie,使用默认cookie访问P站')
            cookies_dict = requests.get(url='https://www.pixiv.net', timeout=10).cookies
        else:
            try:
                # 开始解析输入的cookies
                for i in cookies.split(';'):
                    cookies_key, cookies_value = i.split("=", 1)
                    cookies_dict[cookies_key] = cookies_value
                # 访问用户设置页面,获取用户ID/用户昵称
                info_re = re.compile(r'userData":."id":"(?P<user_id>\d+)".+"name":"(?P<user_name>.+?)"')
                user_info = re.search(info_re, requests.get(url='https://www.pixiv.net/setting_user.php',
                                                            cookies=cookies_dict, timeout=10).text)
                print(Fore.LIGHTMAGENTA_EX, '\n\t当前用户ID为: %s' % user_info.groupdict()['user_id'])
                print(Fore.LIGHTMAGENTA_EX, '\n\t当前用户昵称为: %s' % user_info.groupdict()['user_name'])
            except Exception as e:
                print(Fore.LIGHTRED_EX, '\n\t获取用户ID、用户昵称失败,请确保输入的cookies正确和网络正常')
                print('\n\t%s' % e)
                sys.exit(1)
        # 获取图片总数
        url = 'https://www.pixiv.net/ajax/search/artworks/%s?word=%s&order=date_d&mode=all&p=1&s_mode=s_tag&type=all' % (search_name, search_name)
        total = json.loads(requests.get(url=url, timeout=10, cookies=cookies_dict).text)['body']['illustManga']['total']
        # 如果图片数量为0则程序退出
        if total == 0:
            print(Fore.LIGHTRED_EX, '\n\t%s 搜索到的图片为0,换个关键字试试?,程序退出。' % search_name)
            sys.exit(0)
        else:
            # 获取图片总数
            print(Fore.LIGHTCYAN_EX, '\n\t当前获取到的作品数为 %d' % total)
            # 计算出总页数
            total_page = math.ceil(total / 60)
            print(Fore.LIGHTCYAN_EX, '\n\t总页数为: %d' % total_page)
            # 如果处于未登录状态则下载前十页数据
            if type(cookies_dict) != dict and total_page > 10:
                print(Fore.LIGHTCYAN_EX, '\n\t目前处于未登录状态,只能获取前10页数据')
                total_page = 10
                print(Fore.LIGHTCYAN_EX, '\n\t开始下载 1-%d 页的数据' % total_page)
            # 页码队列
            page_queue = Queue()
            # 图片地址队列
            image_queue = Queue()
            # 将页码放入页码队列
            for page in range(1, total_page + 1):
                page_queue.put(page)
            # 返回当前队列的长度
            print(Fore.LIGHTMAGENTA_EX, '\n\t当前页码队列总数为: %s' % page_queue.qsize())
            # 开启3个获取图片地址的线程
            Picture_Threading_list = ['geturl_threading1', 'geturl_threading2', 'geturl_threading3']
            Picture_Threading = []
            for thread_name_page in Picture_Threading_list:
                # 实例化对象
                thread_page = Get_Picture_Url(thread_name_page, page_queue, image_queue, search_name, cookies_dict)
                # 启动线程
                thread_page.start()
                Picture_Threading.append(thread_page)

            # 开启3个下载线程
            Download_Threading_list = ['download_threading1', 'download_threading2', 'download_threading3']
            Download_Threading = []

            for thread_name_parse in Download_Threading_list:
                thread_parse = Download_Image(thread_name_parse, image_queue, search_name)
                # 启动线程
                thread_parse.start()
                Download_Threading.append(thread_parse)

            # 定义线程退出机制
            global url_flag
            while not page_queue.empty():
                pass
            url_flag = True
            # 结束获取地址类线程
            for thread_url in Picture_Threading:
                thread_url.join()
                print("\n\t%s 处理结束" % thread_url.thread_name)

            global image_flag
            while not image_queue.empty():
                pass
            image_flag = True
            # 结束图片下载类线程
            for thread_download in Download_Threading:
                thread_download.join()
                print('\n\t%s 处理结束' % thread_download.thread_name)


if __name__ == '__main__':
    run()

跑一下,不登录的情况下,

开了几秒钟停掉了,下载了这么多,

登录的话就这样,提前准备好你的cookies

也是下了几秒钟就停掉了,

两次搜索的东西都是一样的,不难发现登陆之后比未登录的时候获取到的作品数要多,总之我想要的功能是实现了,其实还有一些需要优化的地方,其一是指定下载的范围,目前是下载一到最后一页,有点太暴力了,其二是某些作品里面可能会有多张图,这个目前还没做判断,只下载了第一张图,如果是多图就新建illustId目录,将对应的图片下载到对应的illustId目录内,目前所有图片都是堆到一起的,再就是GIF图片也有问题,年前是没时间搞这个了,之后有时间会搞什么下载某用户的作品、排行榜之类的功能,根说Github上有一堆有这种功能项目,我也没去了解过,总之还是自己写的用起来比较顺手,也就当练手了。

上面我并没有用爬虫代理,如果你要隐藏自己就需要设置代理了,设置方法其实很简单,就是买这种服务太贵了,我之前用过一个叫阿布云的,这个可以按小时去买,一小时一块钱,每秒请求数5,而且那里也有怎么设置代理的文档,有兴趣可以去了解一下撒,所以,本文结束。

最后编辑于: 2020 年 03 月 05 日
返回文章列表 文章二维码 打赏
本页链接的二维码
打赏二维码