客户要做代发微信红包的活动,无一例外地招来黑产的攻击盗刷作弊。
和黑产斗智斗勇内耗严重。

活动数据始终容易被伪造,所以思路最后还是落在增加他们的违法成本上,让其食之无味知难而退。

主要方式是针对可疑的IP建立黑名单机制,因为更换IP或者找寻特定地理位置的代理的IP成本极高,即使它们在目标地区,更换动态IP也能减缓其速度。

需求前提铺垫到此为止,以下是实现过程。

考虑到这种查库存在误差和过时失效等精确度的问题。IP归属地和 手机号码归属地进行结合判定,提高判断结果的可信程度。

查询手机号码归属地

沿用早前已经写过的一篇

https://gen8.orz.com.cn/blog/index.php/archives/201

用python的 phone 库实现

安装

pip install phone

实现

from phone import Phone

p = Phone()
res = p.find('13922121234')

print(res)
print(res["province"], res["city"])

# {'phone_type': '移动', 'city': '广州', 'phone': '13922121234', 'area_code': '020', 'province': '广东', 'zip_code': '510000'}
# 广东 广州

查询IP归属地

使用纯真IP库方案
库文件

https://github.com/out0fmemory/qqwry.dat

参考实现方法
https://www.cnblogs.com/fatalord/p/10779393.html

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# filename: ipseeker.py

import socket
import struct

class CzIp:
    def __init__(self, db_file='./data/qqwry.dat'):
        self.f_db = open(db_file, "rb")
        bs = self.f_db.read(8)
        (self.first_index, self.last_index) = struct.unpack('II', bs)
        self.index_count = int((self.last_index - self.first_index) / 7 + 1)
        self.cur_start_ip = None
        self.cur_end_ip_offset = None
        self.cur_end_ip = None
        # print(self.get_version(), " 纪录总数: %d 条 "%(self.index_count))

    def get_version(self):
        '''
        获取版本信息,最后一条IP记录 255.255.255.0-255.255.255.255 是版本信息
        :return: str
        '''
        s = self.get_addr_by_ip(0xffffff00)
        return s

    def _get_area_addr(self, offset=0):
        if offset:
            self.f_db.seek(offset)
        bs = self.f_db.read(1)
        (byte,) = struct.unpack('B', bs)
        if byte == 0x01 or byte == 0x02:
            p = self.getLong3()
            if p:
                return self.get_offset_string(p)
            else:
                return ""
        else:
            self.f_db.seek(-1, 1)
            return self.get_offset_string(offset)

    def _get_addr(self, offset):
        '''
        获取offset处记录区地址信息(包含国家和地区)
        如果是中国ip,则是 "xx省xx市 xxxxx地区" 这样的形式
        (比如:"福建省 电信", "澳大利亚 墨尔本Goldenit有限公司")
        :param offset:
        :return:str
        '''
        self.f_db.seek(offset + 4)
        bs = self.f_db.read(1)
        (byte,) = struct.unpack('B', bs)
        if byte == 0x01:    # 重定向模式1
            country_offset = self.getLong3()
            self.f_db.seek(country_offset)
            bs = self.f_db.read(1)
            (b,) = struct.unpack('B', bs)
            if b == 0x02:
                country_addr = self.get_offset_string(self.getLong3())
                self.f_db.seek(country_offset + 4)
            else:
                country_addr = self.get_offset_string(country_offset)
            area_addr = self._get_area_addr()
        elif byte == 0x02:  # 重定向模式2
            country_addr = self.get_offset_string(self.getLong3())
            area_addr = self._get_area_addr(offset + 8)
        else:   # 字符串模式
            country_addr = self.get_offset_string(offset + 4)
            area_addr = self._get_area_addr()
        return country_addr + " " + area_addr

    def dump(self, first, last):
        '''
        打印数据库中索引为first到索引为last(不包含last)的记录
        :param first:
        :param last:
        :return:
        '''
        if last > self.index_count:
            last = self.index_count
        for index in range(first, last):
            offset = self.first_index + index * 7
            self.f_db.seek(offset)
            buf = self.f_db.read(7)
            (ip, of1, of2) = struct.unpack("IHB", buf)
            address = self._get_addr(of1 + (of2 << 16))
            print("%d %s %s" % (index, self.ip2str(ip), address))

    def _set_ip_range(self, index):
        offset = self.first_index + index * 7
        self.f_db.seek(offset)
        buf = self.f_db.read(7)
        (self.cur_start_ip, of1, of2) = struct.unpack("IHB", buf)
        self.cur_end_ip_offset = of1 + (of2 << 16)
        self.f_db.seek(self.cur_end_ip_offset)
        buf = self.f_db.read(4)
        (self.cur_end_ip,) = struct.unpack("I", buf)

    def get_addr_by_ip(self, ip):
        '''
        通过ip查找其地址
        :param ip: (int or str)
        :return: str
        '''
        if type(ip) == str:
            ip = self.str2ip(ip)
        L = 0
        R = self.index_count - 1
        while L < R - 1:
            M = int((L + R) / 2)
            self._set_ip_range(M)
            if ip == self.cur_start_ip:
                L = M
                break
            if ip > self.cur_start_ip:
                L = M
            else:
                R = M
        self._set_ip_range(L)
        # version information, 255.255.255.X, urgy but useful
        if ip & 0xffffff00 == 0xffffff00:
            self._set_ip_range(R)
        if self.cur_start_ip <= ip <= self.cur_end_ip:
            address = self._get_addr(self.cur_end_ip_offset)
        else:
            address = "未找到该IP的地址"
        return address

    def get_ip_range(self, ip):
        '''
        返回ip所在记录的IP段
        :param ip: ip(str or int)
        :return: str
        '''
        if type(ip) == str:
            ip = self.str2ip(ip)
        self.get_addr_by_ip(ip)
        range = self.ip2str(self.cur_start_ip) + ' - ' \
                + self.ip2str(self.cur_end_ip)
        return range

    def get_offset_string(self, offset=0):
        '''
        获取文件偏移处的字符串(以'\0'结尾)
        :param offset: 偏移
        :return: str
        '''
        if offset:
            self.f_db.seek(offset)
        bs = b''
        ch = self.f_db.read(1)
        (byte,) = struct.unpack('B', ch)
        while byte != 0:
            bs += ch
            ch = self.f_db.read(1)
            (byte,) = struct.unpack('B', ch)
        return bs.decode('gbk')

    def ip2str(self, ip):
        '''
        整数IP转化为IP字符串
        :param ip:
        :return:
        '''
        return str(ip >> 24) + '.' + str((ip >> 16) & 0xff) + '.' + str((ip >> 8) & 0xff) + '.' + str(ip & 0xff)

    def str2ip(self, s):
        '''
        IP字符串转换为整数IP
        :param s:
        :return:
        '''
        (ip,) = struct.unpack('I', socket.inet_aton(s))
        return ((ip >> 24) & 0xff) | ((ip & 0xff) << 24) | ((ip >> 8) & 0xff00) | ((ip & 0xff00) << 8)

    def getLong3(self, offset=0):
        '''
        3字节的数值
        :param offset:
        :return:
        '''
        if offset:
            self.f_db.seek(offset)
        bs = self.f_db.read(3)
        (a, b) = struct.unpack('HB', bs)
        return (b << 16) + a


'''
if __name__ == '__main__':
    cz = CzIp()
    ip = '113.65.22.187'
    print(cz.get_addr_by_ip(ip)) 
    # 广东省广州市 电信
'''

PHP调用执行python

上述两项结合后的python脚本可以作为后台服务持续或定时处理一些历史数据。但对于来自客户端的实时响应,需要从php端实时获得查询结果。
由于不想重新用php实现多一次,想着怎样通过php去调用python执行结果。

网上很多方案使用 exec(), system() 之类方法去在系统命令行层面执行python,但很多云主机上这样的方法是被禁用的,即使手动打开了php.ini 上的配置,也是无法使用,且不说禁用的原因确实存在安全隐患没有必要给自己增加安全风险,于是采用了下面的方案。

启用 Python3的 http服务,使python脚本作为一个http api 运行

只要py的http端口和lnmp端口不冲突,运行在非默认端口即可,且云主机配置防火墙不对公开放这个特殊端口,可以杜绝外部的风险又不影响本机php脚本通过localhost 访问。是一个比较取巧的办法。

不过要注意py脚本的稳定性健壮性,需要持续运行的。

实现代码

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# http_server.py

from http.server import HTTPServer, BaseHTTPRequestHandler
import os,sys
import io,shutil


host = ('localhost', 9876) #http服务运行端口

class Resquest(BaseHTTPRequestHandler):
    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()        
        enc = "UTF-8"
        encoded = ''.join(self.path).encode(enc)
        f = io.BytesIO()
        f.write(encoded)
        f.seek(0)
        shutil.copyfileobj(f, self.wfile)

if __name__ == '__main__':
    server = HTTPServer(host, Resquest)
    print("Starting server, listen at: %s:%s" % host)
    server.serve_forever()

上述脚本是把URL上的路径参数打印输出

运行这个脚本

python3 http_server.py
Starting server, listen at: localhost:9876

尝试在本机浏览器访问

http://localhost:9876/123abc

浏览器显示 或通过 php 的 file_get_contents('http://localhost:9876/123abc') 捕获结果

/123abc

于是很我们可以通过地址对 python传参

举例 http://localhost:9876/13800138000 我们获得参数手机号码的方式是在上文的python脚本里

phone_num = self.path[1:]

去掉开头第一个字符 / 即可得到 13800138000

最后结合上文全部的代码

PHP

//判断是不是广东用户(需要手机或IP任一命中),返回 true 为外省人
function outlander($phone,$ip){
    $result = false; //默认false,以防服务宕机
    $out = file_get_contents("http://localhost:9876/".$phone."/".$ip);   //url传两个参数
    if($out == '11'){ //无效手机号情况的返回值,判定为无效    
        $result = true;
    }elseif(strstr($out,'广东')){
        $result = false;
    }else{ //更新,ip和phone都不是广东属性
        $result = true;
    }
    return $result;
}

$ip = $_SERVER['REMOTE_ADDR'];
$phone = "13380070777";
$isOutlander= outlander($phone,$ip); //False

//对判断结果 使用的业务逻辑 略

Python3

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from http.server import HTTPServer, BaseHTTPRequestHandler
import os,sys
import io,shutil
import re
from phone import Phone
from ipseeker import CzIp

p = Phone()

def isPhone(phone):
    #phone='14737373737'
    if len(phone)==11:
        #rp=re.compile('^0\d{2,3}\d{7,8}$|^1[358]\d{10}$|^147\d{8}')
        rp = re.compile('^1\d{10}')
        return rp.match(phone)

    else:
        return False


host = ('localhost', 9876)

class Resquest(BaseHTTPRequestHandler):
    def do_GET(self):
        parts = self.path.split("/") #需要确保传入参数是 /phone/IP 格式
        phone_num = parts[-2]
        ip_addr = parts[-1]

        cz = CzIp()
        ip_local = cz.get_addr_by_ip(ip_addr) #IP归属地

        if (isPhone(phone_num)):
            res = p.find(phone_num)
            if res is not None:                
                #直出,两个地址,都不含广东才会被php端拦截
                out = res["province"] + res["city"] + "," + ip_local
            else:
                # 无匹配归属地信息或不是有效手机号码
                out = '11'
        else:
            # 不是手机号码            
            out = '11'

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        #self.wfile.write(self.path)
        enc = "UTF-8"
        encoded = ''.join(out).encode(enc)
        f = io.BytesIO()
        f.write(encoded)
        f.seek(0)
        shutil.copyfileobj(f, self.wfile)

if __name__ == '__main__':
    server = HTTPServer(host, Resquest)
    print("Starting server, listen at: %s:%s" % host)
    server.serve_forever()

后端运行,启动http服务

nohup python3 http_server.py &

测试运行结果(PHP) pyexec.php

$ip = $_SERVER['REMOTE_ADDR'];
$out = file_get_contents("http://localhost:9876/".$_GET["phone"]."/".$ip);
var_dump($out); 

浏览器访问 http://localhost/pyexec.php?phone=13380070777 的结果

string(38) "广东广州,广东省广州市 电信"

标签: php, lbs, 归属地, python3, ip

添加新评论