使用python tornado实现微信公众号实例
实现几个必须功能的例子,离实用远着呢:
main.py:
#!/usr/bin/python
#-*- encoding:utf-8 -*-
'''
需要安装:
$ sudo yum install python-devel
$ tar -zxvf pycrypto-2.6.1.tar.gz
$ cd pycrypto-2.6.1
$ sudo python setup.py build
$ sudo python setup.py install
$ sudo yum install libxml2-devel
$ sudo yum install libxml2-python
$ sudo yum install libxml++-devel
$ sudo yum install libxslt-devel
$ sudo pip install lxml
$ sudo pip install requests
后台运行:
$ sudo bash -c 'nohup python ./main.py &'
'''
import os.path
import platform
import hashlib
import time
import sys
import importlib
import urllib2
import requests
import mimetypes
import io
import json
# from xml.etree import cElementTree as ETree
import lxml.etree as ETree
# import torndb
import tornado.httpserver
import tornado.web
import tornado.ioloop
import tornado.options
import tornado.template
from tornado.options import define, options
import global_define
import access_token
from encrypt_WeiXin.WXBizMsgCrypt import WXBizMsgCrypt
SYS_PLATFORM = platform.system()
define("port", default=80, help="run port", type=int) # 微信只支持80端口
'''
define("mysql_host", default="127.0.0.1:3306", help="db host")
define("mysql_database", default="todo", help="db name")
define("mysql_user", default="root", help="db user")
define("mysql_password", default="", help="db password")
'''
TEMPLATE_PATH = os.path.join(os.path.dirname(__file__), "templates")
STATIC_PATH = os.path.join(os.path.dirname(__file__), "static")
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/", IndexHandler),
(r"/todo/new", NewHandler),
(r"/todo/(\d+)/edit", EditHandler),
]
if SYS_PLATFORM == "Windows":
settings = dict(
template_path = TEMPLATE_PATH,
static_path = STATIC_PATH,
debug = True # debug 和 autoreload 模式下不能启用多进程方式(启动debug,自动autoreload)
)
else:
settings = dict(
template_path = TEMPLATE_PATH,
static_path = STATIC_PATH,
)
tornado.web.Application.__init__(self, handlers, **settings)
'''
self.db = torndb.Connection(
host = options.mysql_host,
database = options.mysql_database,
user = options.mysql_user,
password = options.mysql_password
)
'''
class IndexHandler(tornado.web.RequestHandler):
def get(self): # 验证开发接口,参考 http://mp.weixin.qq.com/wiki/17/2d4265491f12608cd170a95559800f2d.html
self.set_header("Server", "Car Ocean")
self.set_header("X-Powered-By", "Car Ocean")
token = global_define.TOKEN
signatures = self.get_arguments("signature")
if len(signatures) == 0:
print("INFO: GET operation, need 'signature' parameter")
return
signature = self.get_arguments("signature")[0]
timestamp = self.get_arguments("timestamp")[0]
nonce = self.get_arguments("nonce")[0]
echostr = self.get_arguments("echostr")[0]
sortlist = [token, timestamp, nonce]
sortlist.sort()
shaStr = "".join(sortlist)
sha = hashlib.sha1(shaStr)
# sha.update("".join(sortlist))
calculate_signature = sha.hexdigest()
print("token: %s" % token)
print("timestamp: %s" % timestamp)
print("nonce: %s" % nonce)
print("echostr: %s" % echostr)
print("getted signature: %s" % signature)
print("calculate signature: %s" % calculate_signature)
if calculate_signature == signature:
self.write(echostr)
else:
todos = ['ERROR', 'bbb']
self.render("index.html", todos=todos)
def post(self):
self.set_header("Server", "Car Ocean")
self.set_header("X-Powered-By", "Car Ocean")
body = self.request.body
print("Thread(Main), Time: %s, DEBUG: body: \n%s" % (time.ctime(), body))
encrypt_typeList = self.get_arguments("encrypt_type")
msg_signature = ""
timestamp = ""
nonce = ""
if len(encrypt_typeList) == 0:
print("Thread(Main), Time: %s, INFO: is not crypt message" % (time.ctime()))
xmlData = ETree.fromstring(body)
encrypt_type = "raw"
else:
encrypt_type = self.get_arguments("encrypt_type")[0]
if encrypt_type == "raw":
print("Thread(Main), Time: %s, INFO: is not crypt message" % (time.ctime()))
xmlData = ETree.fromstring(body)
elif encrypt_type == "aes":
print("Thread(Main), Time: %s, INFO: aes crypt message" % (time.ctime()))
msg_signature = self.get_arguments("msg_signature")[0]
timestamp = self.get_arguments("timestamp")[0]
nonce = self.get_arguments("nonce")[0]
wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID)
ret , decryptBody = wxBizMsgCrypt.DecryptMsg(body, msg_signature, timestamp, nonce)
print("Thread(Main), Time: %s, DEBUG: decrypt body: %s" % (time.ctime(), decryptBody))
print("Thread(Main), Time: %s, DEBUG: msg_signature: %s" % (time.ctime(), msg_signature))
print("Thread(Main), Time: %s, DEBUG: timestamp: %s" % (time.ctime(), timestamp))
print("Thread(Main), Time: %s, DEBUG: nonce: %s" % (time.ctime(), nonce))
xmlData = ETree.fromstring(decryptBody)
else:
print("Thread(Main), Time: %s, ERROR: not support encrypt_type: %s" % (time.ctime(), encrypt_type))
xmlData = ""
if not xmlData == "":
toUserName = xmlData.find("ToUserName").text
print("Thread(Main), Time: %s, DEBUG: ToUserName : %s" % (time.ctime(), toUserName))
fromUserName = xmlData.find("FromUserName").text
print("Thread(Main), Time: %s, DEBUG: FromUserName : %s" % (time.ctime(), fromUserName))
createTime = xmlData.find("CreateTime").text
print("Thread(Main), Time: %s, DEBUG: CreateTime : %s" % (time.ctime(), createTime))
msgType = xmlData.find("MsgType").text
print("Thread(Main), Time: %s, DEBUG: MsgType : %s" % (time.ctime(), msgType))
if( msgType == "event" ):
event = xmlData.find("Event").text
print("Thread(Main), Time: %s, DEBUG: Event : %s" % (time.ctime(), event))
if event == "subscribe":
self.Subscribe(encrypt_type, toUserName, fromUserName, createTime)
elif event == "unsubscribe":
self.Unsubscribe()
elif event == "CLICK":
eventKey = xmlData.find("EventKey").text
print("Thread(Main), Time: %s, DEBUG: EventKey : %s" % (time.ctime(), eventKey))
self.MenuClick(encrypt_type, toUserName, fromUserName, createTime, eventKey)
else:
self.write("")
if (msgType == "text"):
Content = xmlData.find("Content").text
print("Thread(Main), Time: %s, DEBUG: Content : %s" % (time.ctime(), Content))
MsgId = xmlData.find("MsgId").text
print("Thread(Main), Time: %s, DEBUG: MsgId : %s" % (time.ctime(), MsgId))
if Content == "1":
self.RecvTextMsg1(encrypt_type, toUserName, fromUserName, createTime, Content, MsgId)
elif Content == "2":
self.RecvTextMsg2(encrypt_type, toUserName, fromUserName, createTime, Content, MsgId)
else:
self.write("")
def MenuClick(self, msgEncrypt_type, msgToUserName, msgFromUserName, msgCreateTime, msgEventKey):
# 生成未加密原始xml消息
xmlRoot = ETree.Element("xml")
toUserName = ETree.SubElement(xmlRoot, 'ToUserName')
toUserName.text = ETree.CDATA(msgFromUserName)
fromUserName = ETree.SubElement(xmlRoot, 'FromUserName')
fromUserName.text = ETree.CDATA(msgToUserName)
createTime = ETree.SubElement(xmlRoot, 'CreateTime')
createTime.text = msgCreateTime
msgType = ETree.SubElement(xmlRoot, 'MsgType')
msgType.text = ETree.CDATA("text")
content = ETree.SubElement(xmlRoot, 'Content')
content.text = ETree.CDATA(u"点击了菜单:"+msgEventKey) #注意在xml中的中文需要使用unicode方式
xmlData = ETree.tostring(xmlRoot, encoding="utf-8")
print("Thread(Main), Time: %s, DEBUG: Return original message : \n%s" % (time.ctime(), xmlData))
# 加密处理
if (msgEncrypt_type == "raw"):
self.write(xmlData)
else:
nonce = str(int(time.time()))
wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID)
ret, encryptBody = wxBizMsgCrypt.EncryptMsg(xmlData, nonce)
if ret != 0:
print("Thread(Main), Time: %s, ERROR: EncryptMsg error : %d" % (time.ctime(), ret))
self.write("")
xmlData = encryptBody
print("Thread(Main), Time: %s, DEBUG: Return encrypted message : \n%s" % (time.ctime(), xmlData))
self.set_header('Content-Type', 'text/xml; encoding=utf-8')
self.write(xmlData)
def RecvTextMsg1(self, msgEncrypt_type, msgToUserName, msgFromUserName, msgCreateTime, msgContent, msgMsgId):
# 生成未加密原始xml消息
xmlRoot = ETree.Element("xml")
toUserName = ETree.SubElement(xmlRoot, 'ToUserName')
toUserName.text = ETree.CDATA(msgFromUserName)
fromUserName = ETree.SubElement(xmlRoot, 'FromUserName')
fromUserName.text = ETree.CDATA(msgToUserName)
createTime = ETree.SubElement(xmlRoot, 'CreateTime')
createTime.text = msgCreateTime
msgType = ETree.SubElement(xmlRoot, 'MsgType')
msgType.text = ETree.CDATA("text")
content = ETree.SubElement(xmlRoot, 'Content')
content.text = ETree.CDATA(u"您订阅成功!")
xmlData = ETree.tostring(xmlRoot, encoding="utf-8")
print("Thread(Main), Time: %s, DEBUG: Return original message : \n%s" % (time.ctime(), xmlData))
# 加密处理
if (msgEncrypt_type == "raw"):
self.write(xmlData)
else:
nonce = str(int(time.time()))
wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID)
ret, encryptBody = wxBizMsgCrypt.EncryptMsg(xmlData, nonce)
if ret != 0:
print("Thread(Main), Time: %s, ERROR: EncryptMsg error : %d" % (time.ctime(), ret))
self.write("")
xmlData = encryptBody
print("Thread(Main), Time: %s, DEBUG: Return encrypted message : \n%s" % (time.ctime(), xmlData))
self.set_header('Content-Type', 'text/xml; encoding=utf-8')
self.write(xmlData)
def RecvTextMsg2(self, msgEncrypt_type, msgToUserName, msgFromUserName, msgCreateTime, msgContent, msgMsgId):
# 生成未加密原始xml消息
xmlRoot = ETree.Element("xml")
toUserName = ETree.SubElement(xmlRoot, 'ToUserName')
toUserName.text = ETree.CDATA(msgFromUserName)
fromUserName = ETree.SubElement(xmlRoot, 'FromUserName')
fromUserName.text = ETree.CDATA(msgToUserName)
createTime = ETree.SubElement(xmlRoot, 'CreateTime')
createTime.text = msgCreateTime
msgType = ETree.SubElement(xmlRoot, 'MsgType')
msgType.text = ETree.CDATA("news")
articleCount = ETree.SubElement(xmlRoot, 'ArticleCount')
articleCount.text = "1"
articles = ETree.SubElement(xmlRoot, 'Articles')
item = ETree.SubElement(articles, 'item')
title = ETree.SubElement(item, 'Title')
title.text = ETree.CDATA(u"车可讯")
description = ETree.SubElement(item, 'Description')
description.text = ETree.CDATA(u"欢迎光临车可讯")
picUrl = ETree.SubElement(item, 'PicUrl')
picUrl.text = ETree.CDATA("http://mmbiz.qpic.cn/mmbiz_png/SAydTFbl7nv4ibI88uPrLxFIoKRUiawGwN7Z3p7x1N002uBiaHsYKTksMNrSUT7icshGNia1xtAYAbCTkiaia4yLqtwWA/0")
url = ETree.SubElement(item, 'Url')
url.text = ETree.CDATA("http://cloud.car-ocean.com:8080/")
xmlData = ETree.tostring(xmlRoot, encoding="utf-8")
print("Thread(Main), Time: %s, DEBUG: Return original message : \n%s" % (time.ctime(), xmlData))
# 加密处理
if (msgEncrypt_type == "raw"):
self.write(xmlData)
else:
nonce = str(int(time.time()))
wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID)
ret, encryptBody = wxBizMsgCrypt.EncryptMsg(xmlData, nonce)
if ret != 0:
print("Thread(Main), Time: %s, ERROR: EncryptMsg error : %d" % (time.ctime(), ret))
self.write("")
xmlData = encryptBody
print("Thread(Main), Time: %s, DEBUG: Return encrypted message : \n%s" % (time.ctime(), xmlData))
self.set_header('Content-Type', 'text/xml; encoding=utf-8')
self.write(xmlData)
def Subscribe(self, msgEncrypt_type, msgToUserName, msgFromUserName, msgCreateTime):
# 生成未加密原始xml消息
xmlRoot = ETree.Element("xml")
toUserName = ETree.SubElement(xmlRoot, 'ToUserName')
toUserName.text = ETree.CDATA(msgFromUserName)
fromUserName = ETree.SubElement(xmlRoot, 'FromUserName')
fromUserName.text = ETree.CDATA(msgToUserName)
createTime = ETree.SubElement(xmlRoot, 'CreateTime')
createTime.text = msgCreateTime
msgType = ETree.SubElement(xmlRoot, 'MsgType')
msgType.text = ETree.CDATA("text")
content = ETree.SubElement(xmlRoot, 'Content')
content.text = ETree.CDATA(u"您订阅成功!欢迎您的到来!")
xmlData = ETree.tostring(xmlRoot, encoding="utf-8")
print("Thread(Main), Time: %s, DEBUG: Return original message : \n%s" % (time.ctime(), xmlData))
# 加密处理
if( msgEncrypt_type == "raw" ):
self.write(xmlData)
else:
nonce = str(int(time.time()))
wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID)
ret, encryptBody = wxBizMsgCrypt.EncryptMsg(xmlData, nonce)
if ret != 0:
print("Thread(Main), Time: %s, ERROR: EncryptMsg error : %d" % (time.ctime(), ret))
self.write("")
xmlData = encryptBody
print("Thread(Main), Time: %s, DEBUG: Return encrypted message : \n%s" % (time.ctime(), xmlData))
self.set_header('Content-Type', 'text/xml; encoding=utf-8')
self.write(xmlData)
def Unsubscribe(self):
self.write("")
class NewHandler(tornado.web.RequestHandler):
def post(self):
title = self.get_argument("title")
if not title:
return None
self.redirect("/")
class EditHandler(tornado.web.RequestHandler):
def get(self, id):
todo = 1
return self.render("edit.html", todo=todo)
def post(self, id):
self.redirect("/")
class JSONObject:
def __init__(self, d):
self.__dict__ = d
def main():
tornado.options.parse_command_line()
app = tornado.httpserver.HTTPServer(Application())
if SYS_PLATFORM == "Windows":
app.listen(options.port)
else:
app.bind(options.port)
app.start(num_processes=0) # 启用多进程,0为启动CPU个数的进程,必须关掉debug模式(settings的debug=true语句),windows下不能用
try:
tornado.ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
tornado.ioloop.IOLoop.instance().stop()
if __name__ == "__main__":
global_define.accessToken = access_token.AccessToken(1)
if not SYS_PLATFORM == "Windows":
global_define.accessToken.start()
access_token = global_define.accessToken.GetAccessToken() # 第一次取
access_token = global_define.accessToken.access_token # 需要时这样取
# 上传素材图片: 永久
"""
fileName = '../car_ocean_logo.png'
if(os.path.isfile(fileName)):
fileData = io.BytesIO(open(fileName).read())
fileType = mimetypes.guess_type(fileName) or 'application/octet-stream'
fields = {}
files = {'media': ('car_ocean_logo.png', fileData, fileType)}
url = "https://api.weixin.qq.com/cgi-bin/material/add_material?access_token="+access_token+"&type=image"
print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url))
request = requests.post(url, verify=False, data=fields, files=files)
print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), request.content))
else:
print("Thread(Main), Time: %s, ERROR: file is not exist: %s" % (time.ctime(), fileName))
"""
'''
返回值:
{"media_id": "EXXGrEsCjc4xXltwe1bKnwygZ1EjuWxZxjmHANE2nGE",
"url": "http:\/\/mmbiz.qpic.cn\/mmbiz_png\/SAydTFbl7nv4ibI88uPrLxFIoKRUiawGwN7Z3p7x1N002uBiaHsYKTksMNrSUT7icshGNia1xtAYAbCTkiaia4yLqtwWA\/0?wx_fmt=png"}
'''
# 上传素材缩略图:永久
"""
fileName = '../car_ocean_logo.jpg'
if(os.path.isfile(fileName)):
fileData = io.BytesIO(open(fileName).read())
fileType = mimetypes.guess_type(fileName) or 'application/octet-stream'
fields = {}
files = {'media': ('car_ocean_logo.jpg', fileData, fileType)}
url = "https://api.weixin.qq.com/cgi-bin/material/add_material?access_token="+access_token+"&type=thumb"
print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url))
request = requests.post(url, verify=False, data=fields, files=files)
print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), request.content))
else:
print("Thread(Main), Time: %s, ERROR: file is not exist: %s" % (time.ctime(), fileName))
"""
'''
返回值:
{"media_id":"EXXGrEsCjc4xXltwe1bKn9doo_oPZSwcCrlHVL8h2_U",
"url":"http:\/\/mmbiz.qpic.cn\/mmbiz_jpg\/SAydTFbl7nv4ibI88uPrLxFIoKRUiawGwN1300ZIOiasdJdqTerKJ3ootb3b1SzOW0yxtWnqQDDSVkicvSuScicE4og\/0?wx_fmt=jpeg"}
'''
# 上传素材图片: 临时
"""
fileName = '../car_ocean_logo.png'
if(os.path.isfile(fileName)):
fileData = io.BytesIO(open(fileName).read())
fileType = mimetypes.guess_type(fileName) or 'application/octet-stream'
fields = {}
files = {'media': ('car_ocean_logo.png', fileData, fileType)}
url = "https://api.weixin.qq.com/cgi-bin/media/uploadimg?access_token="+access_token+"&type=image"
print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url))
request = requests.post(url, verify=False, data=fields, files=files)
print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), request.content))
else:
print("Thread(Main), Time: %s, ERROR: file is not exist: %s" % (time.ctime(), fileName))
"""
'''
返回值:
{"url":"http:\/\/mmbiz.qpic.cn\/mmbiz_png\/SAydTFbl7nv4ibI88uPrLxFIoKRUiawGwN7Z3p7x1N002uBiaHsYKTksMNrSUT7icshGNia1xtAYAbCTkiaia4yLqtwWA\/0"}
'''
# 上传素材图文:永久
"""
content1 = "欢迎光临车可讯" #注意在json素材中的中文需要使用默认utf-8方式
fileName = 'templates/test.html'
if(os.path.isfile(fileName)):
content1 = open(fileName).read()
content2 = "测试用"
data ={
"articles":[
{"title": "车可讯",
"thumb_media_id": "EXXGrEsCjc4xXltwe1bKn9doo_oPZSwcCrlHVL8h2_U",
"author": "Car Ocean",
"digest": "",
"show_cover_pic": 0,
"content": content1,
"content_source_url": "http://www.car-ocean.com/"
},
{"title": "测试1",
"thumb_media_id": "EXXGrEsCjc4xXltwe1bKn9doo_oPZSwcCrlHVL8h2_U",
"author": "Car Ocean",
"digest": "",
"show_cover_pic": 0,
"content": content2,
"content_source_url": "https://cloud.car-ocean.com:8443/admin/"
},
]
}
json_str = json.dumps(data, ensure_ascii=False)
print("Thread(Main), Time: %s, DEBUG: REQUEST JSON: %s" % (time.ctime(), json_str))
url = "https://api.weixin.qq.com/cgi-bin/material/add_news?access_token="+access_token
print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url))
request = urllib2.Request(url)
request.add_header("Content-Type", "application/json")
response = urllib2.urlopen(request, json.dumps(data, ensure_ascii=False)).read() #注意ensure_ascii必须为False
dataRtn = json.loads(response, object_hook=JSONObject)
print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), response))
if(hasattr(dataRtn, "media_id")):
print("Thread(Main), Time: %s, DEBUG: OUTPUT JSONObject: %s" % (time.ctime(), dataRtn.media_id))
else:
dataRtn.media_id = ""
"""
'''
返回值:
{"media_id":"EXXGrEsCjc4xXltwe1bKn8uwkLzodNgZ-zdZ3DaL6kc"}
删除:
curl -d '{"media_id":"EXXGrEsCjc4xXltwe1bKn8uwkLzodNgZ-zdZ3DaL6kc"}' https://api.weixin.qq.com/cgi-bin/material/del_material?access_token=hSNOuGcaMdtV-WpgnvplF1ZIOmC8shJKPbeeXUN6L1wyFcBnuzZ71fCxd85dpNYguhgp5V96kuWvOWQTFJjqnSzkp-oydPbuC7lPvSzkEWob8g9_ULpgvsx_AV3xogYfIYGdABAZPX
'''
# 推送消息:每天不超过一次,需认证后能用
"""
data ={
"filter":{
"is_to_all":True
},
"mpnews":{
"media_id":dataRtn.media_id
},
"msgtype":"mpnews"
}
data ={
"touser":"oJ-k1wLaKRynhjPWoRaa4_vAH36o",
"mpnews":{
"media_id":dataRtn.media_id
},
"msgtype":"mpnews"
}
json_str = json.dumps(data, ensure_ascii=False)
print("Thread(Main), Time: %s, DEBUG: REQUEST JSON: %s" % (time.ctime(), json_str))
# url = "https://api.weixin.qq.com/cgi-bin/message/mass/sendall?access_token="+access_token
url = "https://api.weixin.qq.com/cgi-bin/message/mass/preview?access_token="+access_token # 仅预览测试
print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url))
request = urllib2.Request(url)
request.add_header("Content-Type", "application/json")
response = urllib2.urlopen(request, json.dumps(data, ensure_ascii=False)).read()
print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), response))
dataRtn = json.loads(response, object_hook=JSONObject)
"""
'''
返回值:
'''
# 创建菜单
"""
data ={
"button":[
{
"type":"click",
"name":"点击菜单",
"key":"MENU_1_NONE"
},
{
"name":"含子菜单",
"sub_button":[
{
"type":"view",
"name":"车可讯官网",
"url":"http://www.car-ocean.com/"
},
{
"type":"view",
"name":"车可讯下载",
"url":"http://cloud.car-ocean.com:8080/"
}
]
}
]
}
json_str = json.dumps(data, ensure_ascii=False)
print("Thread(Main), Time: %s, DEBUG: REQUEST JSON: %s" % (time.ctime(), json_str))
url = "https://api.weixin.qq.com/cgi-bin/menu/create?access_token="+access_token
print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url))
request = urllib2.Request(url)
request.add_header("Content-Type", "application/json")
response = urllib2.urlopen(request, json.dumps(data, ensure_ascii=False)).read()
dataRtn = json.loads(response, object_hook=JSONObject)
print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), response))
if(hasattr(dataRtn, "errmsg")):
print("Thread(Main), Time: %s, DEBUG: OUTPUT JSONObject: %s" % (time.ctime(), dataRtn.errmsg))
"""
'''
返回值:
删除:
'''
main()
access_token.py:
#!/usr/bin/python
#-*- encoding:utf-8 -*-
import time
import threading
import urllib2
import json
import traceback
import signal
import platform
import global_define
# 处理键盘中断
def sigint_handler(signum, frame):
global is_sigint_up
is_sigint_up = True
print('CATCHED INTERRUPT SIGNAL!')
raise SystemExit('Ctrl+C')
signal.signal(signal.SIGINT, sigint_handler)
if not platform.system() == "Windows":
signal.signal(signal.SIGHUP, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
is_sigint_up = False
class AccessToken(threading.Thread):
def __init__(self, number):
#在我重写__init__方法的时候要记得调用基类的__init__ 方法
threading.Thread.__init__(self)
self.number = number
self.access_token = ""
self.expires_in = 7200
def run(self): #重写run()方法,把自己的线程函数的 代码放到这里
count = 0
while True:
if(count >= self.expires_in - 2):
count = 0
self.runMain()
if(is_sigint_up): # 退出
break
count += 1
time.sleep(1)
def runMain(self):
print("Thread(%d), Time: %s, DEBUG: INPUT: %s" % (self.number, time.ctime(), global_define.ACCESS_TOKEN_URL))
httpStream = urllib2.urlopen(global_define.ACCESS_TOKEN_URL)
httpContent = httpStream.read()
print("Thread(%d), Time: %s, DEBUG: OUTPUT: %s" % (self.number, time.ctime(), httpContent))
httpHeader = httpStream.info()
print("Thread(%d), Time: %s, DEBUG: OUTPUT HEADER: %s" % (self.number, time.ctime(), httpHeader))
httpHeaderContenttype = httpHeader.getheader('Content-Type')
print("Thread(%d), Time: %s, DEBUG: OUTPUT HEADER Content-Type: %s" % (self.number, time.ctime(), httpHeaderContenttype))
hjson = json.loads(httpContent)
print("Thread(%d), Time: %s, DEBUG: JSON: %s" % (self.number, time.ctime(), json.dumps(hjson, sort_keys=True, indent=4)))
self.access_token = hjson['access_token']
self.expires_in = hjson['expires_in']
print('Thread(%d), Time: %s, DEBUG: access_token: %s' % (self.number, time.ctime(), self.access_token))
print('Thread(%d), Time: %s, DEBUG: expires_in: %d' % (self.number, time.ctime(), self.expires_in))
url = "https://api.weixin.qq.com/cgi-bin/getcallbackip?access_token="+self.access_token
httpStream = urllib2.urlopen(url)
httpContent = httpStream.read()
hjson = json.loads(httpContent)
self.weiXinIpList = hjson['ip_list']
print('Thread(%d), Time: %s, DEBUG: WeiXin server ip list: %s' % (self.number, time.ctime(), self.weiXinIpList))
def GetAccessToken(self):
if self.access_token == "":
print("Thread(%d), Time: %s, INFO: not yet get access_token" % (self.number, time.ctime()))
self.runMain()
print("Thread(%d), Time: %s, DEBUG: access_token: %s" % (self.number, time.ctime(), self.access_token))
return self.access_token
global_define.py(xxx的地方要替换):
#!/usr/bin/python #-*- encoding:utf-8 -*- global accessToken # 微信测试号: gh_589de042e9ab TOKEN = "xxxxxxxxx" # 3-32字符 EncodingAESKey = "xxxxxxxxxxxxxx" # 43字符 APPID = "xxxxxxxxxxxxxxxxx" APPSECRET = "xxxxxxxxxxxxxxxxxx" ACCESS_TOKEN_URL = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid="+APPID+u"&secret="+APPSECRET