Skip to content

天楚锐齿

人工智能 云计算 大数据 物联网 IT 通信 嵌入式

天楚锐齿

  • 下载
  • 物联网
  • 云计算
  • 大数据
  • 人工智能
  • Linux&Android
  • 网络
  • 通信
  • 嵌入式
  • 杂七杂八

使用python tornado实现微信公众号实例

2018-03-14
实现几个必须功能的例子,离实用远着呢:
main.py:
#!/usr/bin/python
#-*- encoding:utf-8 -*-

'''
需要安装:
$ sudo yum install python-devel
$ tar -zxvf pycrypto-2.6.1.tar.gz
$ cd pycrypto-2.6.1
$ sudo python setup.py build
$ sudo python setup.py install
$ sudo yum install libxml2-devel
$ sudo yum install libxml2-python
$ sudo yum install libxml++-devel
$ sudo yum install libxslt-devel
$ sudo pip install lxml
$ sudo pip install requests
后台运行:
$ sudo bash -c 'nohup python ./main.py &'
'''

import os.path
import platform
import hashlib
import time
import sys
import importlib
import urllib2
import requests
import mimetypes
import io
import json
# from xml.etree import cElementTree as ETree
import lxml.etree as ETree
# import torndb
import tornado.httpserver
import tornado.web
import tornado.ioloop
import tornado.options
import tornado.template

from tornado.options import define, options

import global_define
import access_token
from encrypt_WeiXin.WXBizMsgCrypt import WXBizMsgCrypt

SYS_PLATFORM = platform.system()

define("port", default=80, help="run port", type=int) # 微信只支持80端口
'''
define("mysql_host", default="127.0.0.1:3306", help="db host")
define("mysql_database", default="todo", help="db name")
define("mysql_user", default="root", help="db user")
define("mysql_password", default="", help="db password")
'''

TEMPLATE_PATH = os.path.join(os.path.dirname(__file__), "templates")
STATIC_PATH = os.path.join(os.path.dirname(__file__), "static")

class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r"/", IndexHandler),
            (r"/todo/new", NewHandler),
            (r"/todo/(\d+)/edit", EditHandler),
        ]
        if SYS_PLATFORM == "Windows":
            settings = dict(
                template_path = TEMPLATE_PATH,
                static_path = STATIC_PATH,
                debug = True # debug 和 autoreload 模式下不能启用多进程方式(启动debug,自动autoreload)
            )
        else:
            settings = dict(
                template_path = TEMPLATE_PATH,
                static_path = STATIC_PATH,
            )
        tornado.web.Application.__init__(self, handlers, **settings)
'''
        self.db = torndb.Connection(
            host = options.mysql_host,
            database = options.mysql_database,
            user = options.mysql_user,
            password = options.mysql_password
        )
'''

class IndexHandler(tornado.web.RequestHandler):
    def get(self): #  验证开发接口,参考 http://mp.weixin.qq.com/wiki/17/2d4265491f12608cd170a95559800f2d.html
        self.set_header("Server", "Car Ocean")
        self.set_header("X-Powered-By", "Car Ocean")
        token = global_define.TOKEN
        signatures = self.get_arguments("signature")
        if len(signatures) == 0:
            print("INFO: GET operation, need 'signature' parameter")
            return
        signature = self.get_arguments("signature")[0]
        timestamp = self.get_arguments("timestamp")[0]
        nonce = self.get_arguments("nonce")[0]
        echostr = self.get_arguments("echostr")[0]
        sortlist = [token, timestamp, nonce]
        sortlist.sort()
        shaStr = "".join(sortlist)
        sha = hashlib.sha1(shaStr)
#        sha.update("".join(sortlist))
        calculate_signature = sha.hexdigest()
        print("token:     %s" % token)
        print("timestamp: %s" % timestamp)
        print("nonce:     %s" % nonce)
        print("echostr:   %s" % echostr)
        print("getted    signature: %s" % signature)
        print("calculate signature: %s" % calculate_signature)
        if calculate_signature == signature:
            self.write(echostr)
        else:
            todos = ['ERROR', 'bbb']
            self.render("index.html", todos=todos)

    def post(self):
        self.set_header("Server", "Car Ocean")
        self.set_header("X-Powered-By", "Car Ocean")
        body = self.request.body
        print("Thread(Main), Time: %s, DEBUG: body: \n%s" % (time.ctime(), body))
        encrypt_typeList = self.get_arguments("encrypt_type")
        msg_signature = ""
        timestamp = ""
        nonce = ""
        if len(encrypt_typeList) == 0:
            print("Thread(Main), Time: %s, INFO: is not crypt message" % (time.ctime()))
            xmlData = ETree.fromstring(body)
            encrypt_type = "raw"
        else:
            encrypt_type = self.get_arguments("encrypt_type")[0]
            if encrypt_type == "raw":
                print("Thread(Main), Time: %s, INFO: is not crypt message" % (time.ctime()))
                xmlData = ETree.fromstring(body)
            elif encrypt_type == "aes":
                print("Thread(Main), Time: %s, INFO: aes crypt message" % (time.ctime()))
                msg_signature = self.get_arguments("msg_signature")[0]
                timestamp = self.get_arguments("timestamp")[0]
                nonce = self.get_arguments("nonce")[0]
                wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID)
                ret , decryptBody = wxBizMsgCrypt.DecryptMsg(body, msg_signature, timestamp, nonce)
                print("Thread(Main), Time: %s, DEBUG: decrypt body: %s" % (time.ctime(), decryptBody))
                print("Thread(Main), Time: %s, DEBUG: msg_signature: %s" % (time.ctime(), msg_signature))
                print("Thread(Main), Time: %s, DEBUG: timestamp: %s" % (time.ctime(), timestamp))
                print("Thread(Main), Time: %s, DEBUG: nonce: %s" % (time.ctime(), nonce))
                xmlData = ETree.fromstring(decryptBody)
            else:
                print("Thread(Main), Time: %s, ERROR: not support encrypt_type: %s" % (time.ctime(), encrypt_type))
                xmlData = ""
        if not xmlData == "":
            toUserName = xmlData.find("ToUserName").text
            print("Thread(Main), Time: %s, DEBUG: ToUserName : %s" % (time.ctime(), toUserName))
            fromUserName = xmlData.find("FromUserName").text
            print("Thread(Main), Time: %s, DEBUG: FromUserName : %s" % (time.ctime(), fromUserName))
            createTime = xmlData.find("CreateTime").text
            print("Thread(Main), Time: %s, DEBUG: CreateTime : %s" % (time.ctime(), createTime))
            msgType = xmlData.find("MsgType").text
            print("Thread(Main), Time: %s, DEBUG: MsgType : %s" % (time.ctime(), msgType))
            if( msgType == "event" ):
                event = xmlData.find("Event").text
                print("Thread(Main), Time: %s, DEBUG: Event : %s" % (time.ctime(), event))
                if event == "subscribe":
                    self.Subscribe(encrypt_type, toUserName, fromUserName, createTime)
                elif event == "unsubscribe":
                    self.Unsubscribe()
                elif event == "CLICK":
                    eventKey = xmlData.find("EventKey").text
                    print("Thread(Main), Time: %s, DEBUG: EventKey : %s" % (time.ctime(), eventKey))
                    self.MenuClick(encrypt_type, toUserName, fromUserName, createTime,  eventKey)
                else:
                    self.write("")
            if (msgType == "text"):
                Content = xmlData.find("Content").text
                print("Thread(Main), Time: %s, DEBUG: Content : %s" % (time.ctime(), Content))
                MsgId = xmlData.find("MsgId").text
                print("Thread(Main), Time: %s, DEBUG: MsgId : %s" % (time.ctime(), MsgId))
                if Content == "1":
                    self.RecvTextMsg1(encrypt_type, toUserName, fromUserName, createTime, Content, MsgId)
                elif Content == "2":
                    self.RecvTextMsg2(encrypt_type, toUserName, fromUserName, createTime, Content, MsgId)
                else:
                    self.write("")

    def MenuClick(self, msgEncrypt_type, msgToUserName, msgFromUserName, msgCreateTime,  msgEventKey):
        # 生成未加密原始xml消息
        xmlRoot = ETree.Element("xml")
        toUserName = ETree.SubElement(xmlRoot, 'ToUserName')
        toUserName.text = ETree.CDATA(msgFromUserName)
        fromUserName = ETree.SubElement(xmlRoot, 'FromUserName')
        fromUserName.text = ETree.CDATA(msgToUserName)
        createTime = ETree.SubElement(xmlRoot, 'CreateTime')
        createTime.text = msgCreateTime
        msgType = ETree.SubElement(xmlRoot, 'MsgType')
        msgType.text = ETree.CDATA("text")
        content = ETree.SubElement(xmlRoot, 'Content')
        content.text = ETree.CDATA(u"点击了菜单:"+msgEventKey)  #注意在xml中的中文需要使用unicode方式
        xmlData = ETree.tostring(xmlRoot, encoding="utf-8")
        print("Thread(Main), Time: %s, DEBUG: Return original message : \n%s" % (time.ctime(), xmlData))
        # 加密处理
        if (msgEncrypt_type == "raw"):
            self.write(xmlData)
        else:
            nonce = str(int(time.time()))
            wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID)
            ret, encryptBody = wxBizMsgCrypt.EncryptMsg(xmlData, nonce)
            if ret != 0:
                print("Thread(Main), Time: %s, ERROR: EncryptMsg error : %d" % (time.ctime(), ret))
                self.write("")
            xmlData = encryptBody
            print("Thread(Main), Time: %s, DEBUG: Return encrypted message : \n%s" % (time.ctime(), xmlData))
            self.set_header('Content-Type', 'text/xml; encoding=utf-8')
            self.write(xmlData)

    def RecvTextMsg1(self, msgEncrypt_type, msgToUserName, msgFromUserName, msgCreateTime, msgContent, msgMsgId):
        # 生成未加密原始xml消息
        xmlRoot = ETree.Element("xml")
        toUserName = ETree.SubElement(xmlRoot, 'ToUserName')
        toUserName.text = ETree.CDATA(msgFromUserName)
        fromUserName = ETree.SubElement(xmlRoot, 'FromUserName')
        fromUserName.text = ETree.CDATA(msgToUserName)
        createTime = ETree.SubElement(xmlRoot, 'CreateTime')
        createTime.text = msgCreateTime
        msgType = ETree.SubElement(xmlRoot, 'MsgType')
        msgType.text = ETree.CDATA("text")
        content = ETree.SubElement(xmlRoot, 'Content')
        content.text = ETree.CDATA(u"您订阅成功!")
        xmlData = ETree.tostring(xmlRoot, encoding="utf-8")
        print("Thread(Main), Time: %s, DEBUG: Return original message : \n%s" % (time.ctime(), xmlData))
        # 加密处理
        if (msgEncrypt_type == "raw"):
            self.write(xmlData)
        else:
            nonce = str(int(time.time()))
            wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID)
            ret, encryptBody = wxBizMsgCrypt.EncryptMsg(xmlData, nonce)
            if ret != 0:
                print("Thread(Main), Time: %s, ERROR: EncryptMsg error : %d" % (time.ctime(), ret))
                self.write("")
            xmlData = encryptBody
            print("Thread(Main), Time: %s, DEBUG: Return encrypted message : \n%s" % (time.ctime(), xmlData))
            self.set_header('Content-Type', 'text/xml; encoding=utf-8')
            self.write(xmlData)

    def RecvTextMsg2(self, msgEncrypt_type, msgToUserName, msgFromUserName, msgCreateTime, msgContent, msgMsgId):
        # 生成未加密原始xml消息
        xmlRoot = ETree.Element("xml")
        toUserName = ETree.SubElement(xmlRoot, 'ToUserName')
        toUserName.text = ETree.CDATA(msgFromUserName)
        fromUserName = ETree.SubElement(xmlRoot, 'FromUserName')
        fromUserName.text = ETree.CDATA(msgToUserName)
        createTime = ETree.SubElement(xmlRoot, 'CreateTime')
        createTime.text = msgCreateTime
        msgType = ETree.SubElement(xmlRoot, 'MsgType')
        msgType.text = ETree.CDATA("news")
        articleCount = ETree.SubElement(xmlRoot, 'ArticleCount')
        articleCount.text = "1"
        articles = ETree.SubElement(xmlRoot, 'Articles')
        item = ETree.SubElement(articles, 'item')
        title = ETree.SubElement(item, 'Title')
        title.text = ETree.CDATA(u"车可讯")
        description = ETree.SubElement(item, 'Description')
        description.text = ETree.CDATA(u"欢迎光临车可讯")
        picUrl = ETree.SubElement(item, 'PicUrl')
        picUrl.text = ETree.CDATA("http://mmbiz.qpic.cn/mmbiz_png/SAydTFbl7nv4ibI88uPrLxFIoKRUiawGwN7Z3p7x1N002uBiaHsYKTksMNrSUT7icshGNia1xtAYAbCTkiaia4yLqtwWA/0")
        url = ETree.SubElement(item, 'Url')
        url.text = ETree.CDATA("http://cloud.car-ocean.com:8080/")

        xmlData = ETree.tostring(xmlRoot, encoding="utf-8")
        print("Thread(Main), Time: %s, DEBUG: Return original message : \n%s" % (time.ctime(), xmlData))
        # 加密处理
        if (msgEncrypt_type == "raw"):
            self.write(xmlData)
        else:
            nonce = str(int(time.time()))
            wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID)
            ret, encryptBody = wxBizMsgCrypt.EncryptMsg(xmlData, nonce)
            if ret != 0:
                print("Thread(Main), Time: %s, ERROR: EncryptMsg error : %d" % (time.ctime(), ret))
                self.write("")
            xmlData = encryptBody
            print("Thread(Main), Time: %s, DEBUG: Return encrypted message : \n%s" % (time.ctime(), xmlData))
            self.set_header('Content-Type', 'text/xml; encoding=utf-8')
            self.write(xmlData)

    def Subscribe(self, msgEncrypt_type, msgToUserName, msgFromUserName, msgCreateTime):
        # 生成未加密原始xml消息
        xmlRoot = ETree.Element("xml")
        toUserName = ETree.SubElement(xmlRoot, 'ToUserName')
        toUserName.text = ETree.CDATA(msgFromUserName)
        fromUserName = ETree.SubElement(xmlRoot, 'FromUserName')
        fromUserName.text = ETree.CDATA(msgToUserName)
        createTime = ETree.SubElement(xmlRoot, 'CreateTime')
        createTime.text = msgCreateTime
        msgType = ETree.SubElement(xmlRoot, 'MsgType')
        msgType.text = ETree.CDATA("text")
        content = ETree.SubElement(xmlRoot, 'Content')
        content.text = ETree.CDATA(u"您订阅成功!欢迎您的到来!")
        xmlData = ETree.tostring(xmlRoot, encoding="utf-8")
        print("Thread(Main), Time: %s, DEBUG: Return original message : \n%s" % (time.ctime(), xmlData))

        # 加密处理
        if( msgEncrypt_type == "raw" ):
            self.write(xmlData)
        else:
            nonce = str(int(time.time()))
            wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID)
            ret, encryptBody = wxBizMsgCrypt.EncryptMsg(xmlData, nonce)
            if ret != 0:
                print("Thread(Main), Time: %s, ERROR: EncryptMsg error : %d" % (time.ctime(), ret))
                self.write("")
            xmlData = encryptBody
            print("Thread(Main), Time: %s, DEBUG: Return encrypted message : \n%s" % (time.ctime(), xmlData))
            self.set_header('Content-Type', 'text/xml; encoding=utf-8')
            self.write(xmlData)

    def Unsubscribe(self):
        self.write("")

class NewHandler(tornado.web.RequestHandler):
    def post(self):
        title = self.get_argument("title")
        if not title:
            return None
        self.redirect("/")


class EditHandler(tornado.web.RequestHandler):
    def get(self, id):
        todo = 1
        return self.render("edit.html", todo=todo)

    def post(self, id):
        self.redirect("/")

class JSONObject:
    def __init__(self, d):
        self.__dict__ = d

def main():
    tornado.options.parse_command_line()
    app = tornado.httpserver.HTTPServer(Application())
    if SYS_PLATFORM == "Windows":
        app.listen(options.port)
    else:
        app.bind(options.port)
        app.start(num_processes=0) # 启用多进程,0为启动CPU个数的进程,必须关掉debug模式(settings的debug=true语句),windows下不能用
    try:
        tornado.ioloop.IOLoop.instance().start()
    except KeyboardInterrupt:
        tornado.ioloop.IOLoop.instance().stop()

if __name__ == "__main__":
    global_define.accessToken = access_token.AccessToken(1)
    if not SYS_PLATFORM == "Windows":
        global_define.accessToken.start()
    access_token = global_define.accessToken.GetAccessToken() # 第一次取
    access_token = global_define.accessToken.access_token     # 需要时这样取

    # 上传素材图片: 永久
    """
    fileName = '../car_ocean_logo.png'
    if(os.path.isfile(fileName)):
        fileData = io.BytesIO(open(fileName).read())
        fileType = mimetypes.guess_type(fileName) or 'application/octet-stream'
        fields = {}
        files = {'media': ('car_ocean_logo.png', fileData, fileType)}
        url = "https://api.weixin.qq.com/cgi-bin/material/add_material?access_token="+access_token+"&type=image"
        print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url))
        request = requests.post(url, verify=False, data=fields, files=files)
        print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), request.content))
    else:
        print("Thread(Main), Time: %s, ERROR: file is not exist: %s" % (time.ctime(), fileName))
    """
    '''
    返回值:
    {"media_id": "EXXGrEsCjc4xXltwe1bKnwygZ1EjuWxZxjmHANE2nGE",
     "url": "http:\/\/mmbiz.qpic.cn\/mmbiz_png\/SAydTFbl7nv4ibI88uPrLxFIoKRUiawGwN7Z3p7x1N002uBiaHsYKTksMNrSUT7icshGNia1xtAYAbCTkiaia4yLqtwWA\/0?wx_fmt=png"}
    '''
    # 上传素材缩略图:永久
    """
    fileName = '../car_ocean_logo.jpg'
    if(os.path.isfile(fileName)):
        fileData = io.BytesIO(open(fileName).read())
        fileType = mimetypes.guess_type(fileName) or 'application/octet-stream'
        fields = {}
        files = {'media': ('car_ocean_logo.jpg', fileData, fileType)}
        url = "https://api.weixin.qq.com/cgi-bin/material/add_material?access_token="+access_token+"&type=thumb"
        print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url))
        request = requests.post(url, verify=False, data=fields, files=files)
        print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), request.content))
    else:
        print("Thread(Main), Time: %s, ERROR: file is not exist: %s" % (time.ctime(), fileName))
    """
    '''
    返回值:
    {"media_id":"EXXGrEsCjc4xXltwe1bKn9doo_oPZSwcCrlHVL8h2_U",
    "url":"http:\/\/mmbiz.qpic.cn\/mmbiz_jpg\/SAydTFbl7nv4ibI88uPrLxFIoKRUiawGwN1300ZIOiasdJdqTerKJ3ootb3b1SzOW0yxtWnqQDDSVkicvSuScicE4og\/0?wx_fmt=jpeg"}
    '''
    # 上传素材图片: 临时
    """
    fileName = '../car_ocean_logo.png'
    if(os.path.isfile(fileName)):
        fileData = io.BytesIO(open(fileName).read())
        fileType = mimetypes.guess_type(fileName) or 'application/octet-stream'
        fields = {}
        files = {'media': ('car_ocean_logo.png', fileData, fileType)}
        url = "https://api.weixin.qq.com/cgi-bin/media/uploadimg?access_token="+access_token+"&type=image"
        print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url))
        request = requests.post(url, verify=False, data=fields, files=files)
        print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), request.content))
    else:
        print("Thread(Main), Time: %s, ERROR: file is not exist: %s" % (time.ctime(), fileName))
    """
    '''
    返回值:
    {"url":"http:\/\/mmbiz.qpic.cn\/mmbiz_png\/SAydTFbl7nv4ibI88uPrLxFIoKRUiawGwN7Z3p7x1N002uBiaHsYKTksMNrSUT7icshGNia1xtAYAbCTkiaia4yLqtwWA\/0"}
    '''
    # 上传素材图文:永久
    """
    content1 = "欢迎光临车可讯"   #注意在json素材中的中文需要使用默认utf-8方式
    fileName = 'templates/test.html'
    if(os.path.isfile(fileName)):
        content1 = open(fileName).read()
    content2 = "测试用"
    data ={
        "articles":[
            {"title": "车可讯",
             "thumb_media_id": "EXXGrEsCjc4xXltwe1bKn9doo_oPZSwcCrlHVL8h2_U",
             "author": "Car Ocean",
             "digest": "",
             "show_cover_pic": 0,
             "content": content1,
             "content_source_url": "http://www.car-ocean.com/"
            },
            {"title": "测试1",
             "thumb_media_id": "EXXGrEsCjc4xXltwe1bKn9doo_oPZSwcCrlHVL8h2_U",
             "author": "Car Ocean",
             "digest": "",
             "show_cover_pic": 0,
             "content": content2,
             "content_source_url": "https://cloud.car-ocean.com:8443/admin/"
            },
        ]
    }
    json_str = json.dumps(data, ensure_ascii=False)
    print("Thread(Main), Time: %s, DEBUG: REQUEST JSON: %s" % (time.ctime(), json_str))
    url = "https://api.weixin.qq.com/cgi-bin/material/add_news?access_token="+access_token
    print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url))
    request = urllib2.Request(url)
    request.add_header("Content-Type", "application/json")
    response = urllib2.urlopen(request, json.dumps(data, ensure_ascii=False)).read()  #注意ensure_ascii必须为False
    dataRtn = json.loads(response, object_hook=JSONObject)
    print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), response))
    if(hasattr(dataRtn, "media_id")):
        print("Thread(Main), Time: %s, DEBUG: OUTPUT JSONObject: %s" % (time.ctime(), dataRtn.media_id))
    else:
        dataRtn.media_id = ""
    """
    '''
    返回值:
    {"media_id":"EXXGrEsCjc4xXltwe1bKn8uwkLzodNgZ-zdZ3DaL6kc"}
    删除:
    curl -d '{"media_id":"EXXGrEsCjc4xXltwe1bKn8uwkLzodNgZ-zdZ3DaL6kc"}' https://api.weixin.qq.com/cgi-bin/material/del_material?access_token=hSNOuGcaMdtV-WpgnvplF1ZIOmC8shJKPbeeXUN6L1wyFcBnuzZ71fCxd85dpNYguhgp5V96kuWvOWQTFJjqnSzkp-oydPbuC7lPvSzkEWob8g9_ULpgvsx_AV3xogYfIYGdABAZPX
    '''
    # 推送消息:每天不超过一次,需认证后能用
    """
    data ={
        "filter":{
            "is_to_all":True
        },
        "mpnews":{
            "media_id":dataRtn.media_id
        },
        "msgtype":"mpnews"
    }
    data ={
        "touser":"oJ-k1wLaKRynhjPWoRaa4_vAH36o",
        "mpnews":{
            "media_id":dataRtn.media_id
        },
        "msgtype":"mpnews"
    }
    json_str = json.dumps(data, ensure_ascii=False)
    print("Thread(Main), Time: %s, DEBUG: REQUEST JSON: %s" % (time.ctime(), json_str))
    # url = "https://api.weixin.qq.com/cgi-bin/message/mass/sendall?access_token="+access_token
    url = "https://api.weixin.qq.com/cgi-bin/message/mass/preview?access_token="+access_token  # 仅预览测试
    print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url))
    request = urllib2.Request(url)
    request.add_header("Content-Type", "application/json")
    response = urllib2.urlopen(request, json.dumps(data, ensure_ascii=False)).read()
    print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), response))
    dataRtn = json.loads(response, object_hook=JSONObject)
    """
    '''
    返回值:
    '''

    # 创建菜单
    """
    data ={
        "button":[
        {
            "type":"click",
            "name":"点击菜单",
            "key":"MENU_1_NONE"
        },
        {
            "name":"含子菜单",
            "sub_button":[
            {
                "type":"view",
                "name":"车可讯官网",
                "url":"http://www.car-ocean.com/"
            },
            {
               "type":"view",
               "name":"车可讯下载",
               "url":"http://cloud.car-ocean.com:8080/"
            }
            ]
        }
        ]
    }
    json_str = json.dumps(data, ensure_ascii=False)
    print("Thread(Main), Time: %s, DEBUG: REQUEST JSON: %s" % (time.ctime(), json_str))
    url = "https://api.weixin.qq.com/cgi-bin/menu/create?access_token="+access_token
    print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url))
    request = urllib2.Request(url)
    request.add_header("Content-Type", "application/json")
    response = urllib2.urlopen(request, json.dumps(data, ensure_ascii=False)).read()
    dataRtn = json.loads(response, object_hook=JSONObject)
    print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), response))
    if(hasattr(dataRtn, "errmsg")):
        print("Thread(Main), Time: %s, DEBUG: OUTPUT JSONObject: %s" % (time.ctime(), dataRtn.errmsg))
    """
    '''
    返回值:
    删除:
    '''

    main()
access_token.py:
#!/usr/bin/python
#-*- encoding:utf-8 -*-

import time
import threading
import urllib2
import json
import traceback
import signal
import platform

import global_define

# 处理键盘中断
def sigint_handler(signum, frame):
    global is_sigint_up
    is_sigint_up = True
    print('CATCHED INTERRUPT SIGNAL!')
    raise SystemExit('Ctrl+C')
signal.signal(signal.SIGINT, sigint_handler)
if not platform.system() == "Windows":
    signal.signal(signal.SIGHUP, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
is_sigint_up = False

class AccessToken(threading.Thread):
    def __init__(self, number):
        #在我重写__init__方法的时候要记得调用基类的__init__ 方法  
        threading.Thread.__init__(self)
        self.number = number
        self.access_token = ""
        self.expires_in = 7200

    def run(self): #重写run()方法,把自己的线程函数的 代码放到这里
        count = 0
        while True:
            if(count >= self.expires_in - 2):
                count = 0
                self.runMain()
            if(is_sigint_up): # 退出
                break
            count += 1
            time.sleep(1)

    def runMain(self):
        print("Thread(%d), Time: %s, DEBUG: INPUT:  %s" % (self.number, time.ctime(), global_define.ACCESS_TOKEN_URL))
        httpStream = urllib2.urlopen(global_define.ACCESS_TOKEN_URL)
        httpContent = httpStream.read()
        print("Thread(%d), Time: %s, DEBUG: OUTPUT: %s" % (self.number, time.ctime(), httpContent))
        httpHeader = httpStream.info()
        print("Thread(%d), Time: %s, DEBUG: OUTPUT HEADER: %s" % (self.number, time.ctime(), httpHeader))
        httpHeaderContenttype = httpHeader.getheader('Content-Type')
        print("Thread(%d), Time: %s, DEBUG: OUTPUT HEADER Content-Type: %s" % (self.number, time.ctime(), httpHeaderContenttype))
        hjson = json.loads(httpContent)
        print("Thread(%d), Time: %s, DEBUG: JSON: %s" % (self.number, time.ctime(), json.dumps(hjson, sort_keys=True, indent=4)))
        self.access_token = hjson['access_token']
        self.expires_in = hjson['expires_in']
        print('Thread(%d), Time: %s, DEBUG: access_token: %s' % (self.number, time.ctime(), self.access_token))
        print('Thread(%d), Time: %s, DEBUG: expires_in: %d' % (self.number, time.ctime(), self.expires_in))
        url = "https://api.weixin.qq.com/cgi-bin/getcallbackip?access_token="+self.access_token
        httpStream = urllib2.urlopen(url)
        httpContent = httpStream.read()
        hjson = json.loads(httpContent)
        self.weiXinIpList = hjson['ip_list']
        print('Thread(%d), Time: %s, DEBUG: WeiXin server ip list: %s' % (self.number, time.ctime(), self.weiXinIpList))

    def GetAccessToken(self):
        if self.access_token == "":
            print("Thread(%d), Time: %s, INFO: not yet get access_token" % (self.number, time.ctime()))
            self.runMain()
        print("Thread(%d), Time: %s, DEBUG: access_token: %s" % (self.number, time.ctime(), self.access_token))
        return self.access_token
global_define.py(xxx的地方要替换):
#!/usr/bin/python
#-*- encoding:utf-8 -*-

global accessToken

# 微信测试号: gh_589de042e9ab
TOKEN = "xxxxxxxxx" # 3-32字符
EncodingAESKey = "xxxxxxxxxxxxxx" # 43字符

APPID = "xxxxxxxxxxxxxxxxx"
APPSECRET = "xxxxxxxxxxxxxxxxxx"
ACCESS_TOKEN_URL = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid="+APPID+u"&secret="+APPSECRET

1,878次阅读

Post navigation

前一篇:

阿里云内部SLB的问题

后一篇:

3G、4G的APN

发表回复 取消回复

要发表评论,您必须先登录。

个人介绍

需要么,有事情这里找联系方式:关于天楚锐齿

=== 美女同欣赏,好酒共品尝 ===

微信扫描二维码赞赏该文章:

扫描二维码分享该文章:

分类

  • Linux&Android (81)
  • Uncategorized (1)
  • 下载 (28)
  • 云计算 (38)
  • 人工智能 (9)
  • 大数据 (35)
  • 嵌入式 (34)
  • 杂七杂八 (35)
  • 物联网 (65)
  • 网络 (25)
  • 通信 (22)

归档

近期文章

  • 飞书机器人发送卡片interactive消息
  • Springboot JPA实现对数据库表统一的增删改查
  • WEB的内容安全策略CSP(Content-Security-Policy)
  • CSS利用@media和viewport实现响应式布局自动适配手机电脑等
  • VUE前端增加国际化支持

近期评论

  • linux爱好者 发表在《Linux策略路由及iptables mangle、ip rule、ip route关系及一种Network is unreachable错误》
  • maxshu 发表在《使用Android的HIDL+AIDL方式编写从HAL层到APP层的程序》
  • Ambition 发表在《使用Android的HIDL+AIDL方式编写从HAL层到APP层的程序》
  • Ambition 发表在《使用Android的HIDL+AIDL方式编写从HAL层到APP层的程序》
  • maxshu 发表在《Android9下用ethernet 的Tether模式来做路由器功能》

阅读量

  • 使用Android的HIDL+AIDL方式编写从HAL层到APP层的程序 - 23,708次阅读
  • 卸载深信服Ingress、SecurityDesktop客户端 - 18,404次阅读
  • 车机技术之车规级Linux-Automotive Grade Linux(AGL) - 10,475次阅读
  • linux下的unbound DNS服务器设置详解 - 9,183次阅读
  • 在Android9下用ndk编译vSomeIP和CommonAPI以及使用例子 - 9,074次阅读
  • linux的tee命令导致ssh客户端下的shell卡住不动 - 8,579次阅读
  • Linux策略路由及iptables mangle、ip rule、ip route关系及一种Network is unreachable错误 - 8,069次阅读
  • 车机技术之360°全景影像(环视)系统 - 8,051次阅读
  • 车机技术之Android Automotive - 7,907次阅读
  • Windows下安装QEMU并在qemu上安装ubuntu和debian - 7,792次阅读

其他操作

  • 注册
  • 登录
  • 条目 feed
  • 评论 feed
  • WordPress.org

联系方式

地址
深圳市科技园

时间
周一至周五:  9:00~12:00,14:00~18:00
周六和周日:10:00~12:00

标签

android AT命令 CAN centos docker Hadoop hdfs ip java kickstart linux mapreduce mini6410 modem nova OAuth openstack os python socket ssh uboot 内核 协议 安装 嵌入式 性能 报表 授权 操作系统 数据 数据库 月报 模型 汽车 深信服 源代码 统计 编译 脚本 虚拟机 调制解调器 车机 金融 鉴权
© 2025 天楚锐齿