文章内容
2020/4/19 18:02:05,作 者: 黄兵
Python 登录移动查询话费
通过python爬虫方式登录10086后台,查询话费。
# -*- coding: utf-8 -*- # @Time : 2019-02-22 09:52 # @Author : cxa # @File : beijing_crawler.py # @Software: PyCharm import requests import time import base64 from Cryptodome.Cipher import PKCS1_v1_5 from Cryptodome.PublicKey import RSA from io import BytesIO import random from PIL import Image import re class BeiJingCrawler(): headers = { "Accept": "application/json, text/javascript, */*; q=0.01", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "Host": "login.10086.cn", "Origin": "https://login.10086.cn", "Referer": "https://login.10086.cn/html/bj/iloginnew.html?1551250969321", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36", "X-Requested-With": "XMLHttpRequest", } def __init__(self, phone): self.session = requests.Session() self.phone_number = phone def get_timestamp(self): return int(time.time() * 1000) def check_uid_available(self): url = "https://login.10086.cn/checkUidAvailable.action" req = self.session.get(url, headers=self.headers) return req def chk_number_action(self): ''' 检查是否为北京移动,是返回true,不是返回flase :return: ''' url = "https://login.10086.cn/chkNumberBjAction.action" data = { "userName": self.phone_number } req = self.session.post(url, headers=self.headers, data=data) return req def loadToken(self): url = "https://login.10086.cn/loadToken.action" data = { "userName": self.phone_number } req = self.session.post(url, headers=self.headers, data=data) return req def sendRandomCodeAction(self, result): self.headers["Xa-before"] = result url = "https://login.10086.cn/sendRandomCodeAction.action" data = { "userName": self.phone_number, "type": "POST", "channelID": "00100" } req = self.session.post(url, headers=self.headers, data=data) return req def sendflag(self, ): t = self.get_timestamp() url = "https://login.10086.cn/sendflag.htm" data = { "timestamp": t } req = self.session.get(url, headers=self.headers, params=data) return req def send_sms_code(self): r = self.check_uid_available() code = r.json().get("code") if code == "1": r1 = self.chk_number_action() flag = r1.text if flag: r2 = self.loadToken() token = r2.json().get("result") print("token", token) r4 = self.sendflag() print("r4", r4.text) r3 = self.sendRandomCodeAction(token) if r3.text == "0": print("验证码发送成功!") elif r3.text == "1": print("验证码发送频繁,请一分钟后再重试!") elif r3.text == "2": print("验证码发送频繁,已经超过最大次数") def get_password(self, k): rsa_publickey = '''-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsgDq4OqxuEisnk2F0EJF mw4xKa5IrcqEYHvqxPs2CHEg2kolhfWA2SjNuGAHxyDDE5MLtOvzuXjBx/5YJtc9 zj2xR/0moesS+Vi/xtG1tkVaTCba+TV+Y5C61iyr3FGqr+KOD4/XECu0Xky1W9Zm maFADmZi7+6gO9wjgVpU9aLcBcw/loHOeJrCqjp7pA98hRJRY+MML8MK15mnC4eb ooOva+mJlstW6t/1lghR8WNV8cocxgcHHuXBxgns2MlACQbSdJ8c6Z3RQeRZBzyj fey6JCCfbEKouVrWIUuPphBL3OANfgp0B+QG31bapvePTfXU48TYK0M5kE+8Lgbb WQIDAQAB -----END PUBLIC KEY-----''' key = RSA.import_key(rsa_publickey) passwd = PKCS1_v1_5.new(key) text = base64.b64encode(passwd.encrypt(bytes(k, encoding='utf-8'))) return str(text, encoding='utf-8') def login(self, code=None): num = self.phone_number if not code: self.sms_code = input("请输入短信验证码:\n") self.sms_ser = input("请输入服务密码") else: self.sms_code = code login_url = "https://login.10086.cn/touchBjLogin.action" data = { "account": num, "accountType": "01", "channelID": "00100", "password": self.get_password(self.sms_code), "protocol": "https:", "pwdType": "02", "rememberMe": "0", "timestamp": self.get_timestamp(), } req = self.session.post(login_url, headers=self.headers, data=data) return req def verify_captcha(self, code): '''验证验证码是否正确''' flag = self.checkzh(code) if flag == 1: '''1表示中文三个字的验证码''' code = self.str_to_hex(code) req = self.session.get(f"https://login.10086.cn/verifyCaptcha?inputCode={code}") resultCode = req.get("resultCode") return resultCode def str_to_hex(self, s): return ''.join([hex(ord(c)).replace('0x', '') for c in s]) def checkzh(self, s): """判断验证码是否为中文""" pat = re.compile("^[\u4E00-\u9FA5]{3}$") res = pat.findall(s) return len(res) def get_verify_code_img(self): url = "https://login.10086.cn/captchazh.htm?type=12" buff = self.session.get(url).content image = Image.open(BytesIO(buff)) image.show() def get_authimg(self): url = f"https://shop.10086.cn/i/authImg?t={random.random()}" buff = self.session.get(url).content image = Image.open(BytesIO(buff)) image.show() def bill(self, res): r_json = res.json() print("登录状态", r_json) code = r_json.get("result") desc = r_json.get("desc") while code != "0": code = input("请输入正确的短信验证码:") print("开始发送图片验证码") self.get_verify_code_img() img_code = input("获取图片验证码,请输入图片验证码") rescode = self.verify_captcha(img_code) while rescode != "0": img_code = input("图片验证码错误,请输入图片验证码:") rescode = self.verify_captcha(img_code) print("图片验证码输入正确") res = self.login(code) code = res.get("result") print("短信验证码输入正确") r_json = res.json() print(r_json) assert_acceptURL = r_json.get("assertAcceptURL") artifact = r_json.get("artifact") init_url = "https://service.bj.10086.cn/poffice/package/ywcx.action?PACKAGECODE=CXNEW" data = { "artifact": artifact, "backUrl": init_url } print("artifactUrl", assert_acceptURL) headers = self.headers headers.update({'Host': 'service.bj.10086.cn'}) req = self.session.get(assert_acceptURL, headers=self.headers, params=data, verify=False, allow_redirects=False) req.encoding = req.apparent_encoding redirect_url = req.headers['Location'] headers.update({'Referer': ''}) redirect_res1 = self.session.get(url=redirect_url, headers=headers) redirect_res1.encoding = 'utf-8' bill_detail_url = "https://service.bj.10086.cn/poffice/package/xdcx/xdcxShow.action?PACKAGECODE=XD" headers.update({'Referer': 'http://service.bj.10086.cn/poffice/jsp/service/fee/fee.jsp'}) bill_req = self.session.get(bill_detail_url, headers=headers, verify=False) print(bill_req.text) yzmcheck_url = "https://service.bj.10086.cn/poffice/package/xdcx/userYzmCheck.action" data = { "yzCheckCode": self.sms_ser, "PACKAGECODE": "XD", } yzmcheck_req = self.session.post(yzmcheck_url, headers=headers, data=data) yzmcheck_json = yzmcheck_req.json() message = yzmcheck_json.get("message") if message.upper() == "Y": for item in ("201901", "201812"): data = { "PACKAGECODE": "XD", "xdFlag": "GSM", "month": item, } headers.update( {'Referer': 'https://service.bj.10086.cn/poffice/package/xdcx/xdcxShow.action?PACKAGECODE=XD'}) xdsxshow_url = "https://service.bj.10086.cn/poffice/package/xdcx/xdcxShow.action" xdsxshow_req = self.session.post(xdsxshow_url, headers=headers, data=data) print(f"{item}通话详单", xdsxshow_req.text) else: print("登录失败", message) def base64encode(self, passwd): text = base64.b64encode(bytes(passwd, encoding='utf-8')).decode("utf-8") return text if __name__ == '__main__': num = "xxxxxxx" bj = BeiJingCrawler(num) bj.send_sms_code() # 发送短信验证码 res = bj.login() bj.bill(res)
文章出处:北京移动
评论列表