Python:手机归属地与IP归属地的本地查询
客户要做代发微信红包的活动,无一例外地招来黑产的攻击盗刷作弊。
和黑产斗智斗勇内耗严重。
活动数据始终容易被伪造,所以思路最后还是落在增加他们的违法成本上,让其食之无味知难而退。
主要方式是针对可疑的IP建立黑名单机制,因为更换IP或者找寻特定地理位置的代理的IP成本极高,即使它们在目标地区,更换动态IP也能减缓其速度。
需求前提铺垫到此为止,以下是实现过程。
考虑到这种查库存在误差和过时失效等精确度的问题。IP归属地和 手机号码归属地进行结合判定,提高判断结果的可信程度。
查询手机号码归属地
沿用早前已经写过的一篇
https://gen8.orz.com.cn/blog/index.php/archives/201
用python的 phone 库实现
安装
pip install phone
实现
from phone import Phone
p = Phone()
res = p.find('13922121234')
print(res)
print(res["province"], res["city"])
# {'phone_type': '移动', 'city': '广州', 'phone': '13922121234', 'area_code': '020', 'province': '广东', 'zip_code': '510000'}
# 广东 广州
查询IP归属地
使用纯真IP库方案
库文件
https://github.com/out0fmemory/qqwry.dat
参考实现方法
https://www.cnblogs.com/fatalord/p/10779393.html
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# filename: ipseeker.py
import socket
import struct
class CzIp:
def __init__(self, db_file='./data/qqwry.dat'):
self.f_db = open(db_file, "rb")
bs = self.f_db.read(8)
(self.first_index, self.last_index) = struct.unpack('II', bs)
self.index_count = int((self.last_index - self.first_index) / 7 + 1)
self.cur_start_ip = None
self.cur_end_ip_offset = None
self.cur_end_ip = None
# print(self.get_version(), " 纪录总数: %d 条 "%(self.index_count))
def get_version(self):
'''
获取版本信息,最后一条IP记录 255.255.255.0-255.255.255.255 是版本信息
:return: str
'''
s = self.get_addr_by_ip(0xffffff00)
return s
def _get_area_addr(self, offset=0):
if offset:
self.f_db.seek(offset)
bs = self.f_db.read(1)
(byte,) = struct.unpack('B', bs)
if byte == 0x01 or byte == 0x02:
p = self.getLong3()
if p:
return self.get_offset_string(p)
else:
return ""
else:
self.f_db.seek(-1, 1)
return self.get_offset_string(offset)
def _get_addr(self, offset):
'''
获取offset处记录区地址信息(包含国家和地区)
如果是中国ip,则是 "xx省xx市 xxxxx地区" 这样的形式
(比如:"福建省 电信", "澳大利亚 墨尔本Goldenit有限公司")
:param offset:
:return:str
'''
self.f_db.seek(offset + 4)
bs = self.f_db.read(1)
(byte,) = struct.unpack('B', bs)
if byte == 0x01: # 重定向模式1
country_offset = self.getLong3()
self.f_db.seek(country_offset)
bs = self.f_db.read(1)
(b,) = struct.unpack('B', bs)
if b == 0x02:
country_addr = self.get_offset_string(self.getLong3())
self.f_db.seek(country_offset + 4)
else:
country_addr = self.get_offset_string(country_offset)
area_addr = self._get_area_addr()
elif byte == 0x02: # 重定向模式2
country_addr = self.get_offset_string(self.getLong3())
area_addr = self._get_area_addr(offset + 8)
else: # 字符串模式
country_addr = self.get_offset_string(offset + 4)
area_addr = self._get_area_addr()
return country_addr + " " + area_addr
def dump(self, first, last):
'''
打印数据库中索引为first到索引为last(不包含last)的记录
:param first:
:param last:
:return:
'''
if last > self.index_count:
last = self.index_count
for index in range(first, last):
offset = self.first_index + index * 7
self.f_db.seek(offset)
buf = self.f_db.read(7)
(ip, of1, of2) = struct.unpack("IHB", buf)
address = self._get_addr(of1 + (of2 << 16))
print("%d %s %s" % (index, self.ip2str(ip), address))
def _set_ip_range(self, index):
offset = self.first_index + index * 7
self.f_db.seek(offset)
buf = self.f_db.read(7)
(self.cur_start_ip, of1, of2) = struct.unpack("IHB", buf)
self.cur_end_ip_offset = of1 + (of2 << 16)
self.f_db.seek(self.cur_end_ip_offset)
buf = self.f_db.read(4)
(self.cur_end_ip,) = struct.unpack("I", buf)
def get_addr_by_ip(self, ip):
'''
通过ip查找其地址
:param ip: (int or str)
:return: str
'''
if type(ip) == str:
ip = self.str2ip(ip)
L = 0
R = self.index_count - 1
while L < R - 1:
M = int((L + R) / 2)
self._set_ip_range(M)
if ip == self.cur_start_ip:
L = M
break
if ip > self.cur_start_ip:
L = M
else:
R = M
self._set_ip_range(L)
# version information, 255.255.255.X, urgy but useful
if ip & 0xffffff00 == 0xffffff00:
self._set_ip_range(R)
if self.cur_start_ip <= ip <= self.cur_end_ip:
address = self._get_addr(self.cur_end_ip_offset)
else:
address = "未找到该IP的地址"
return address
def get_ip_range(self, ip):
'''
返回ip所在记录的IP段
:param ip: ip(str or int)
:return: str
'''
if type(ip) == str:
ip = self.str2ip(ip)
self.get_addr_by_ip(ip)
range = self.ip2str(self.cur_start_ip) + ' - ' \
+ self.ip2str(self.cur_end_ip)
return range
def get_offset_string(self, offset=0):
'''
获取文件偏移处的字符串(以'\0'结尾)
:param offset: 偏移
:return: str
'''
if offset:
self.f_db.seek(offset)
bs = b''
ch = self.f_db.read(1)
(byte,) = struct.unpack('B', ch)
while byte != 0:
bs += ch
ch = self.f_db.read(1)
(byte,) = struct.unpack('B', ch)
return bs.decode('gbk')
def ip2str(self, ip):
'''
整数IP转化为IP字符串
:param ip:
:return:
'''
return str(ip >> 24) + '.' + str((ip >> 16) & 0xff) + '.' + str((ip >> 8) & 0xff) + '.' + str(ip & 0xff)
def str2ip(self, s):
'''
IP字符串转换为整数IP
:param s:
:return:
'''
(ip,) = struct.unpack('I', socket.inet_aton(s))
return ((ip >> 24) & 0xff) | ((ip & 0xff) << 24) | ((ip >> 8) & 0xff00) | ((ip & 0xff00) << 8)
def getLong3(self, offset=0):
'''
3字节的数值
:param offset:
:return:
'''
if offset:
self.f_db.seek(offset)
bs = self.f_db.read(3)
(a, b) = struct.unpack('HB', bs)
return (b << 16) + a
'''
if __name__ == '__main__':
cz = CzIp()
ip = '113.65.22.187'
print(cz.get_addr_by_ip(ip))
# 广东省广州市 电信
'''
PHP调用执行python
上述两项结合后的python脚本可以作为后台服务持续或定时处理一些历史数据。但对于来自客户端的实时响应,需要从php端实时获得查询结果。
由于不想重新用php实现多一次,想着怎样通过php去调用python执行结果。
网上很多方案使用 exec()
, system()
之类方法去在系统命令行层面执行python,但很多云主机上这样的方法是被禁用的,即使手动打开了php.ini 上的配置,也是无法使用,且不说禁用的原因确实存在安全隐患没有必要给自己增加安全风险,于是采用了下面的方案。
启用 Python3的 http服务,使python脚本作为一个http api 运行
只要py的http端口和lnmp端口不冲突,运行在非默认端口即可,且云主机配置防火墙不对公开放这个特殊端口,可以杜绝外部的风险又不影响本机php脚本通过localhost 访问。是一个比较取巧的办法。
不过要注意py脚本的稳定性健壮性,需要持续运行的。
实现代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# http_server.py
from http.server import HTTPServer, BaseHTTPRequestHandler
import os,sys
import io,shutil
host = ('localhost', 9876) #http服务运行端口
class Resquest(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
enc = "UTF-8"
encoded = ''.join(self.path).encode(enc)
f = io.BytesIO()
f.write(encoded)
f.seek(0)
shutil.copyfileobj(f, self.wfile)
if __name__ == '__main__':
server = HTTPServer(host, Resquest)
print("Starting server, listen at: %s:%s" % host)
server.serve_forever()
上述脚本是把URL上的路径参数打印输出
运行这个脚本
python3 http_server.py
Starting server, listen at: localhost:9876
尝试在本机浏览器访问
http://localhost:9876/123abc
浏览器显示 或通过 php 的 file_get_contents('http://localhost:9876/123abc')
捕获结果
/123abc
于是很我们可以通过地址对 python传参
举例 http://localhost:9876/13800138000
我们获得参数手机号码的方式是在上文的python脚本里
phone_num = self.path[1:]
去掉开头第一个字符 /
即可得到 13800138000
。
最后结合上文全部的代码
PHP
//判断是不是广东用户(需要手机或IP任一命中),返回 true 为外省人
function outlander($phone,$ip){
$result = false; //默认false,以防服务宕机
$out = file_get_contents("http://localhost:9876/".$phone."/".$ip); //url传两个参数
if($out == '11'){ //无效手机号情况的返回值,判定为无效
$result = true;
}elseif(strstr($out,'广东')){
$result = false;
}else{ //更新,ip和phone都不是广东属性
$result = true;
}
return $result;
}
$ip = $_SERVER['REMOTE_ADDR'];
$phone = "13380070777";
$isOutlander= outlander($phone,$ip); //False
//对判断结果 使用的业务逻辑 略
Python3
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from http.server import HTTPServer, BaseHTTPRequestHandler
import os,sys
import io,shutil
import re
from phone import Phone
from ipseeker import CzIp
p = Phone()
def isPhone(phone):
#phone='14737373737'
if len(phone)==11:
#rp=re.compile('^0\d{2,3}\d{7,8}$|^1[358]\d{10}$|^147\d{8}')
rp = re.compile('^1\d{10}')
return rp.match(phone)
else:
return False
host = ('localhost', 9876)
class Resquest(BaseHTTPRequestHandler):
def do_GET(self):
parts = self.path.split("/") #需要确保传入参数是 /phone/IP 格式
phone_num = parts[-2]
ip_addr = parts[-1]
cz = CzIp()
ip_local = cz.get_addr_by_ip(ip_addr) #IP归属地
if (isPhone(phone_num)):
res = p.find(phone_num)
if res is not None:
#直出,两个地址,都不含广东才会被php端拦截
out = res["province"] + res["city"] + "," + ip_local
else:
# 无匹配归属地信息或不是有效手机号码
out = '11'
else:
# 不是手机号码
out = '11'
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
#self.wfile.write(self.path)
enc = "UTF-8"
encoded = ''.join(out).encode(enc)
f = io.BytesIO()
f.write(encoded)
f.seek(0)
shutil.copyfileobj(f, self.wfile)
if __name__ == '__main__':
server = HTTPServer(host, Resquest)
print("Starting server, listen at: %s:%s" % host)
server.serve_forever()
后端运行,启动http服务
nohup python3 http_server.py &
测试运行结果(PHP) pyexec.php
$ip = $_SERVER['REMOTE_ADDR'];
$out = file_get_contents("http://localhost:9876/".$_GET["phone"]."/".$ip);
var_dump($out);
浏览器访问 http://localhost/pyexec.php?phone=13380070777
的结果
string(38) "广东广州,广东省广州市 电信"
