Kaynağa Gözat

first commit

pull/1/head
10710 1 yıl önce
işleme
628003984e
24 değiştirilmiş dosya ile 968 ekleme ve 0 silme
  1. +160
    -0
      .gitignore
  2. +0
    -0
      embeddings/__init__.py
  3. +17
    -0
      embeddings/dashscopeembedding.py
  4. +12
    -0
      embeddings/embedding.py
  5. +22
    -0
      embeddings/liandongembedding.py
  6. +13
    -0
      embeddings/zhipuembedding.py
  7. +222
    -0
      knowledge.py
  8. +0
    -0
      llms/__init__.py
  9. +44
    -0
      llms/baidullm.py
  10. +29
    -0
      llms/liandongllm.py
  11. +12
    -0
      llms/llm.py
  12. +154
    -0
      llms/sparkllm.py
  13. +17
    -0
      llms/zhipullm.py
  14. +63
    -0
      main.py
  15. +9
    -0
      requirements.txt
  16. +33
    -0
      settings.ini
  17. +0
    -0
      sources/__init__.py
  18. +45
    -0
      sources/essource.py
  19. +12
    -0
      sources/source.py
  20. +41
    -0
      sources/sqlsource.py
  21. +0
    -0
      tools/__init__.py
  22. +28
    -0
      tools/kafka.py
  23. +17
    -0
      tools/logger.py
  24. +18
    -0
      tools/myThread.py

+ 160
- 0
.gitignore Dosyayı Görüntüle

@@ -0,0 +1,160 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

+ 0
- 0
embeddings/__init__.py Dosyayı Görüntüle


+ 17
- 0
embeddings/dashscopeembedding.py Dosyayı Görüntüle

@@ -0,0 +1,17 @@
# coding=utf-8
from embeddings.embedding import Embedding
import dashscope
from http import HTTPStatus
from dashscope import TextEmbedding


class Dashscopeembedding(Embedding):
def __init__(self, **param):
dashscope.api_key = param['api_key']

def getem(self, question):
resp = TextEmbedding.call(model=TextEmbedding.Models.text_embedding_v1,
input=question,
text_type='query')
if resp.status_code == HTTPStatus.OK:
return resp['output']['embeddings'][0]['embedding']

+ 12
- 0
embeddings/embedding.py Dosyayı Görüntüle

@@ -0,0 +1,12 @@
# coding=utf-8
from abc import ABC, abstractmethod


class Embedding(ABC):
@abstractmethod
def __init__(self, **param):
pass

@abstractmethod
def getem(self, question):
pass

+ 22
- 0
embeddings/liandongembedding.py Dosyayı Görüntüle

@@ -0,0 +1,22 @@
# coding=utf-8
from embeddings.embedding import Embedding
import requests
import json


class Liandongembedding(Embedding):
def __init__(self, **param):
self.url = param['api_url']

def getem(self, question):
headers = {
'Content-Type': 'application/json',
'Content-Length': '<calculated when request is sent>'
}
data = {
"text": question
}
result = requests.post(self.url,
headers=headers,
data=json.dumps(data))
return json.loads(result.content)['data']

+ 13
- 0
embeddings/zhipuembedding.py Dosyayı Görüntüle

@@ -0,0 +1,13 @@
# coding=utf-8
import zhipuai
from embeddings.embedding import Embedding


class Zhipuembedding(Embedding):
def __init__(self, **param):
zhipuai.api_key = param['api_key']

def getem(self, question):
response = zhipuai.model_api.invoke(model="text_embedding",
prompt=question)
return response['data']['embedding']

+ 222
- 0
knowledge.py Dosyayı Görüntüle

@@ -0,0 +1,222 @@
# coding=utf-8
import numpy as np
import configparser
from datetime import datetime
import json

from tools.logger import Logger


LTHRESHOLD = 0.4
MTHRESHOLD = 0.6
HTHRESHOLD = 0.8
DB = 'xy-cloud1'
INDEX = 'dm_q_and_a,dm_questions'
cache = {}


class Knowledge():
def __init__(self, model):
self.model = model + 'llm'
self.config = configparser.ConfigParser()
self.config.read("settings.ini", encoding="utf-8")

self.sourcename = self.config.get('config', 'source') + 'source'
source_config = dict(self.config.items(self.sourcename))
source_config['index'] = INDEX
source_config['db'] = DB
self.sources = __import__('sources.%s' % self.sourcename,
fromlist=['sources'])
string = 'self.sources.' + self.sourcename.capitalize()
self.source = eval(string)(**source_config)

self.logger = Logger(self.config.get("config", "logger_path"))

self.emname = self.config.get('config', 'embedding') + 'embedding'
embedding_config = dict(self.config.items(self.emname))
self.ems = __import__('embeddings.%s' % self.emname,
fromlist=['embeddings'])
string = 'self.ems.' + self.emname.capitalize()
self.embedding = eval(string)(**embedding_config)

self.llmname = self.model
llm_config = dict(self.config.items(self.llmname))
self.llms = __import__('llms.%s' % self.llmname,
fromlist=['llms'])
string = 'self.llms.' + self.llmname.capitalize()
self.llm = eval(string)(**llm_config)

def combine(self, result, question, accurate, tenantId):
Result = {'code': 200, 'target': 1}
data = {}
if not result == '':
data['result'] = result
data['question'] = question
data['accurate'] = accurate
data['llm'] = self.config.get('config', 'model')
data['tenantId'] = tenantId
Result['data'] = data
return Result

def log_header(self, method):
timestamp = datetime.strftime(datetime.now(), '%Y-%m-%dT%H:%M:%S:%f')
timestamp = '"timestamp":"' + timestamp + '",'
tenant = '"tentant_id":' + str(self.tenant_id) + ','
em = '"embedding":"' + self.emname + '",'
model = '"model":"' + self.llmname + '",'
source = '"source":"' + self.sourcename + '",'
questionstr = '"question":"' + self.question + '","answers":['
logs = ',"method":' + method + ','
logs = timestamp + tenant + em + model + source + logs + questionstr
return logs

def emsearch(self):
data_all = self.source.getdata(tenant_id=self.tenant_id)
logs = self.log_header('"emlist"')
for i in data_all:
item = '"' + i['name'] + '",' + i['id']
logs = logs + '[' + item + '],'
logs = logs[0:-1] if logs[-1] == ',' else logs
self.logger.info(logs + ']')

em_list = []
score_em_max = -2
if len(data_all) > 0:
all_em_score = np.zeros(len(data_all))
v1 = self.embedding.getem(self.question)
for i in range(len(data_all)):
if data_all[i]['name'] in cache.keys():
v2 = cache[data_all[i]['name']]
else:
v2 = self.embedding.getem(data_all[i]['name'])
cache[data_all[i]['name']] = v2
numerator = np.dot(v1, v2)
denominator = (np.linalg.norm(v1) * np.linalg.norm(v2))
all_em_score[i] = numerator / denominator
logs = self.log_header('"emsearch"')
for i in range(6):
t = np.argmax(all_em_score)
if all_em_score[t] < LTHRESHOLD:
break
if i == 0:
score_em_max = all_em_score[t]
if all_em_score[t] <= -1:
break
em_list.append(data_all[t])
item = '"' + data_all[t]['name'] + '",'
item = item + data_all[t]['id']
item = item + ',%.3f' % all_em_score[t]
logs = logs + '[' + item + '],'
all_em_score[t] = -2
logs = logs[0:-1] if logs[-1] == ',' else logs
self.logger.info(logs + ']')
return [em_list, score_em_max]

def recommend(self):
logs = self.log_header('"searchbegin"')
self.logger.info(logs + ']')
[result_list, score_em_max] = self.emsearch()
if score_em_max > HTHRESHOLD:
result_list[0]['highlight'] = 1
logs = self.log_header('"recommend"')
for i in result_list:
logs = logs + '["' + i['name'] + '",' + i['id'] + '],'
logs = logs[0:-1] if logs[-1] == ',' else logs
self.logger.info(logs + ']')
return self.combine(result_list[0:4],
self.question,
1,
self.tenant_id)
if score_em_max > MTHRESHOLD:
result_list[0]['highlight'] = 1

logs = self.log_header('"recommendlist"')
for i in result_list:
logs = logs + '["' + i['name'] + '",' + i['id'] + '],'
logs = logs[0:-1] if logs[-1] == ',' else logs
self.logger.info(logs + ']')
if len(result_list) == 0:
logs = self.log_header('"recommend"')
self.logger.info(logs + ']')
return self.combine("",
self.question,
0,
self.tenant_id)
if len(result_list) <= 4:
logs = self.log_header('"recommend"')
for i in result_list:
logs = logs + '["' + i['name'] + '",' + i['id'] + '],'
logs = logs[0:-1] if logs[-1] == ',' else logs
self.logger.info(logs + ']')
return self.combine(result_list,
self.question,
0,
self.tenant_id)

if self.config.get('config', 'usellm') == "0":
logs = self.log_header('"recommend"')
for i in result_list:
logs = logs + '["' + i['name'] + '",' + i['id'] + '],'
logs = logs[0:-1] if logs[-1] == ',' else logs
self.logger.info(logs + ']')
return self.combine(result_list[0:4],
self.question,
0,
self.tenant_id)
L = ''
for i in result_list:
L = L + '"' + i['name'] + '",'
L = L[0:-1]
Q1 = "请从列表{"
Q2 = "}选出与'"
Q3 = "'意图最接近的四句话,将结果以{question1:,question2:,question3:,question4:}输出。"
Q4 = '输出为JSON格式。答案只能来自于列表。不要返回代码。不要输出JSON之外的东西'
Q = Q1 + L + Q2 + self.question + Q3 + Q4
logs = self.log_header('"llmin"')
self.logger.info(logs + '"' + Q + '"]')
answer, tokens = self.llm.link(Q)
logs = self.log_header('"llmout"')
tokens = str(tokens).replace("'", '"')
logs = logs[0:-1] + repr(answer) + ',"tokens":' + str(tokens)
self.logger.info(logs)
begin = answer.find('{')
end = answer.rfind('}')
answer = answer[begin:end+1]
answer = answer.replace('\\n', '')
answer = answer.replace('\\"', '"')
logs = self.log_header('"llmsearch"')
answer = answer.replace("'", '"')
try:
data = json.loads(answer)
result = []
for key in data:
for i in result_list:
if data[key] == i['name']:
if i not in result:
result.append(i)
except Exception:
result = result_list[0:4]
err_logs = self.log_header('"llmsearch"')
err_logs = err_logs + '"Can not trans to JSON.]"'
self.logger.error(err_logs)
for i in result:
logs = logs + '["' + i['name'] + '",' + i['id'] + '],'
while len(result) < 4:
for i in result_list:
if i not in result:
result.append(i)
break
if len(result) > 4:
result = result[0:4]
logs = logs[0:-1] if logs[-1] == ',' else logs
self.logger.info(logs + ']')

logs = self.log_header('"recommend"')
for i in result:
logs = logs + '["' + i['name'] + '",' + i['id'] + '],'
logs = logs[0:-1] if logs[-1] == ',' else logs
self.logger.info(logs + ']')
return self.combine(result,
self.question,
0,
self.tenant_id)

+ 0
- 0
llms/__init__.py Dosyayı Görüntüle


+ 44
- 0
llms/baidullm.py Dosyayı Görüntüle

@@ -0,0 +1,44 @@
# coding=utf-8
from llms.llm import Llm
import json
import requests


class Baidullm(Llm):
def __init__(self, **param):
self.access_url = param['access_url']
self.api_url = param['api_url']
self.api_key = param['api_key']
self.api_secret = param['api_secret']

def get_access_token(self):
# 获取accesstoken
url = self.access_url + self.api_key
url = url + '&client_secret=' + self.api_secret
payload = json.dumps("")
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
return response.json().get("access_token")

def link(self, question):
url = self.api_url + self.get_access_token()
payload = json.dumps({
"messages": [
{
"role": "user",
"content": question
}
]
})
headers = {
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)
data = json.loads(response.text)
answer = data['result']
tokens = data['usage']['total_tokens']
return answer, tokens

+ 29
- 0
llms/liandongllm.py Dosyayı Görüntüle

@@ -0,0 +1,29 @@
# coding=utf-8
from llms.llm import Llm
import requests
import json


class Liandongllm(Llm):
def __init__(self, **param):
self.api_url = param['api_url']

def link(self, question):
headers = {
'Content-Type': 'json',
'Content-Length': '<calculated when request is sent>'
}
data = {
"category": "bj_unicom",
"messages": [
{
"role": "user",
"content": question
}
]
}
result = requests.post(self.api_url,
headers=headers,
data=json.dumps(data))
result_json = json.loads(result.text)
return result_json['data'][0]['text'], 0

+ 12
- 0
llms/llm.py Dosyayı Görüntüle

@@ -0,0 +1,12 @@
# coding=utf-8
from abc import ABC, abstractmethod


class Llm(ABC):
@abstractmethod
def __init__(self, **param):
pass

@abstractmethod
def link(self, question):
pass

+ 154
- 0
llms/sparkllm.py Dosyayı Görüntüle

@@ -0,0 +1,154 @@
# coding=utf-8
import _thread as thread
import base64
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket
from llms.llm import Llm

S = ''
tokens = ''


class Ws_Param(object):
def __init__(self, APPID, APIKey, APISecret, gpt_url):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.host = urlparse(gpt_url).netloc
self.path = urlparse(gpt_url).path
self.gpt_url = gpt_url

def create_url(self):
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))

signature_origin = "host: " + self.host + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + self.path + " HTTP/1.1"

signature_sha = hmac.new(self.APISecret.encode('utf-8'),
signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()

signature_sha_base64 = base64.b64encode(signature_sha)
signature_sha_base64 = signature_sha_base64.decode(encoding='utf-8')

api_key = f'api_key="{self.APIKey}", '
algorithm = 'algorithm="hmac-sha256", '
headers = 'headers="host date request-line", '
signature = f'signature="{signature_sha_base64}"'
authorization_origin = api_key + algorithm + headers + signature
authorization = base64.b64encode(authorization_origin.encode('utf-8'))
authorization = authorization.decode(encoding='utf-8')

v = {
"authorization": authorization,
"date": date,
"host": self.host
}
url = self.gpt_url + '?' + urlencode(v)
return url


def on_error(ws, error):
print("### error:", error)


def on_close(ws):
print("### closed ###")


def on_open(ws):
thread.start_new_thread(run, (ws,))


def run(ws, *args):
data = json.dumps(gen_params(appid=ws.appid,
question=ws.question,
domain=ws.domain))
ws.send(data)


def on_message(ws, message):
global S
global tokens
data = json.loads(message)
code = data['header']['code']
if code != 0:
print(f'request fail: {code}, {data}')
ws.close()
else:
choices = data["payload"]["choices"]
status = choices["status"]
content = choices["text"][0]["content"]
S = S + content
if 'usage' in data['payload'].keys():
tokens = data['payload']['usage']['text']['total_tokens']
if status == 2:
ws.close()


def gen_params(appid, question, domain):
data = {
"header": {
"app_id": appid,
"uid": "1234"
},
"parameter": {
"chat": {
"domain": domain,
"random_threshold": 0.5,
"max_tokens": 2048,
"auditing": "default"
}
},
"payload": {
"message": {
"text": [
{"role": "user", "content": question}
]
}
}
}
return data


class Sparkllm(Llm):
def __init__(self, **param):
self.api_url = param['api_url']
self.app_id = param['app_id']
self.api_key = param['api_key']
self.api_secret = param['api_secret']
self.domain = param['domain']

def link(self, question):
global tokens
global S
wsParam = Ws_Param(self.app_id,
self.api_key,
self.api_secret,
self.api_url)
websocket.enableTrace(False)
wsUrl = wsParam.create_url()
ws = websocket.WebSocketApp(wsUrl,
on_message=on_message,
on_error=on_error,
on_close=on_close,
on_open=on_open)
ws.appid = self.app_id
ws.question = question
ws.domain = self.domain
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
a = S
t = tokens
tokens = ''
S = ''
return a, t

+ 17
- 0
llms/zhipullm.py Dosyayı Görüntüle

@@ -0,0 +1,17 @@
# coding=utf-8
import zhipuai
from llms.llm import Llm


class Zhipullm(Llm):
def __init__(self, **param):
zhipuai.api_key = param['api_key']

def link(self, question):
response = zhipuai.model_api.invoke(
model="chatglm_std",
prompt=[{"role": "user", "content": question}],
temperature=0.9)
answer = response['data']['choices'][0]['content']
tokens = response['data']['usage']['total_tokens']
return answer, tokens

+ 63
- 0
main.py Dosyayı Görüntüle

@@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
import redis
import json
import copy
import configparser
from knowledge import Knowledge
from sources import essource


redis_conn = redis.Redis(
host='192.168.10.244',
port=6381,
db=0,
decode_responses=True)

config = configparser.ConfigParser()
config.read("settings.ini", encoding="utf-8")
llmname = ['baidu', 'spark', 'zhipu']
llmdict = {}
kldict = {}
for i in llmname:
llm_config = dict(config.items(i + 'llm'))
llms = __import__('llms.%s' % i + 'llm',
fromlist=['llms'])
string = 'llms.' + (i + 'llm').capitalize()
llmdict[i] = eval(string)(**llm_config)
kldict[i] = Knowledge(i)
con = dict(config.items('essource'))
con['index'] = 'a'
es = essource.Essource(**con)

while True:
a = redis_conn.lpop("es_question_mark_log")
if a is not None:
jo = json.loads(a)
types = jo['type']
for k in llmname:
if k == jo['llm']:
if jo['like'] < 0:
redis_conn.rpush('es_question_marked_log', a)
else:
es.upload(a)
else:
b = copy.deepcopy(jo)
if b['type'] == 0:
b['answer'] = llmdict[k].link(b['question'])[0]
else:
temp = kldict[k].recommend(b['tenant_id'],
b['question'])
if len(temp['data']) > 0:
answers = []
for tempanswer in temp['data']['result']:
answers.append(tempanswer['name'])
b['answer'] = answers
else:
continue
b['llm'] = k
b['like'] = -1
print(b)
redis_conn.rpush('es_question_marked_log', str(b))
else:
print("none")
break

+ 9
- 0
requirements.txt Dosyayı Görüntüle

@@ -0,0 +1,9 @@
dashscope==1.5.0
elasticsearch==8.9.0
Flask==2.2.2
numpy==1.23.5
PyMySQL==1.0.2
Requests==2.31.0
websocket_client==0.58.0
zhipuai==1.0.7
werkzeug==2.2.2

+ 33
- 0
settings.ini Dosyayı Görüntüle

@@ -0,0 +1,33 @@
[config]
source=sql
model=spark
embedding=dashscope
logger_path=/usr/share/knowledge/
usellm=1
[dashscopeembedding]
api_key=sk-44ccc9ab5e754eddb545cade12b632cf
[zhipuembedding]
api_key=e1c759ec620a9045360d7a90d697b88f.pjn1S2MqSS8lNmzj
[liandongembedding]
api_url=http://125.34.89.79:8000/index/embeddings
[essource]
api_url=http://8.140.53.237:9200
[sqlsource]
api_url=192.168.10.244:3308
api_id=root
api_password=Digimeta@123
[sparkllm]
api_url=ws://spark-api.xf-yun.com/v2.1/chat
domain=generalv2
app_id=948cf4b6
api_key=54f6e81f40a31d66d976496de895a7a4
api_secret=ZDYyMjNmMTlkYTE0YWRmOWUwZTYxNjYz
[baidullm]
access_url=https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=
api_key=TUuGVkpzzotFAhIIGIa0OCUO
api_secret=fv6LTCRcYhtxYb4Frs55jttOYICenCQG
api_url=https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant?access_token=
[liandongllm]
api_url=http://125.34.89.79:8000/search/questionAnswer
[zhipullm]
api_key=e1c759ec620a9045360d7a90d697b88f.pjn1S2MqSS8lNmzj

+ 0
- 0
sources/__init__.py Dosyayı Görüntüle


+ 45
- 0
sources/essource.py Dosyayı Görüntüle

@@ -0,0 +1,45 @@
# coding=utf-8
from sources.source import Source
from elasticsearch import Elasticsearch
from datetime import datetime


class Essource(Source):
def __init__(self, **param):
self.es = Elasticsearch(param['api_url'])
self.index = param['index'].split(',')

def getdata(self, **param):
result_list = []
query = {
"bool": {
"must": [
{
"term": {"tenant_id": param['tenant_id']}
},
{
"term": {"del_flag": 0}
},
{
"term": {"status": '0'}
}
]
}
}
result = self.es.search(index=self.index,
query=query,
size=10000)
for items in result['hits']['hits']:
temp = {"name": items['_source']['name'],
"answer": items['_source']['answer'],
"id": items['_id'],
"model_type": items['_source']['model_type'],
"knowledge_lib": items['_source']['knowledge_lib']}
result_list.append(temp)
return result_list

def upload(self, msg):
timestamp = datetime.strftime(datetime.now(), '%Y%m%d%H%M%S%f')
self.es.create(index=self.index,
id=timestamp,
document=msg)

+ 12
- 0
sources/source.py Dosyayı Görüntüle

@@ -0,0 +1,12 @@
# coding=utf-8
from abc import ABC, abstractmethod


class Source(ABC):
@abstractmethod
def __init__(self, **param):
pass

@abstractmethod
def getdata(self, **param):
pass

+ 41
- 0
sources/sqlsource.py Dosyayı Görüntüle

@@ -0,0 +1,41 @@
# coding=utf-8
from sources.source import Source
import pymysql


class Sqlsource(Source):
def __init__(self, **param):
self.index = param['index'].split(',')
self.conn = pymysql.connect(host=param['api_url'].split(':')[0],
port=int(param['api_url'].split(':')[1]),
user=param['api_id'],
password=param['api_password'],
db=param['db'],
charset='utf8')
self.cursor = self.conn.cursor()

def getdata(self, **param):
result_list = []
sql = 'SELECT a.name,a.answer,a.id,a.type,b.name FROM '
sql = sql + '%s as a ' % self.index[0]
sql = sql + 'LEFT JOIN %s as b ON a.knowledge_id=b.id ' % self.index[1]
sql = sql + 'WHERE a.del_flag=0 and a.status=0'
sql = sql + ' and a.tenant_id=%s;' % param['tenant_id']
self.cursor.execute(sql)
results = self.cursor.fetchall()
for items in results:
temp = {"name": items[0],
"answer": items[1],
"id": str(items[2]),
"model_type": items[3],
"knowledge_lib": items[4]}
result_list.append(temp)
return result_list

def getten(self, devId):
sql = 'SELECT t_id FROM %s' % self.index[0]
sql = sql + ' WHERE device_id="%s"' % devId
self.cursor.execute(sql)
results = self.cursor.fetchall()
return results


+ 0
- 0
tools/__init__.py Dosyayı Görüntüle


+ 28
- 0
tools/kafka.py Dosyayı Görüntüle

@@ -0,0 +1,28 @@
import json
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback


class Kafka():
def __init__(self):
self.kafkaCon = KafkaConsumer('es_question_mark_log',
bootstrap_servers='8.140.53.237:9092',
api_version=(0, 11, 5),
group_id='test_kl')
self.kafkaPro = KafkaProducer(bootstrap_servers=['8.140.53.237:9092'],
api_version=(0, 11, 5),
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode())

def upload(self, key, msg):
request = self.kafkaPro.send("es_question_mark_log",
key=key,
value=str(msg))
try:
request.get(timeout=10)
except kafka_errors:
traceback.format_exc()

def download(self):
return self.kafkaCon

+ 17
- 0
tools/logger.py Dosyayı Görüntüle

@@ -0,0 +1,17 @@
# coding=utf-8
class Logger():
def __init__(self, logger_path):
self.info_path = logger_path + 'nlt_info.log'
self.error_path = logger_path + 'nlt_error.log'

def info(self, msg):
f = open(self.info_path, 'a')
f.write(msg)
f.write('\n')
f.close()

def error(self, msg):
f = open(self.error_path, 'a')
f.write(msg)
f.write('\n')
f.close()

+ 18
- 0
tools/myThread.py Dosyayı Görüntüle

@@ -0,0 +1,18 @@
# coding=utf-8
import threading


# 自定义线程
class MyThread(threading.Thread):
def __init__(self, func, args=()):
super(MyThread, self).__init__()
self.func = func
self.args = args
self.result = [[], -1]

def run(self):
self.result = self.func()

def get_result(self):
threading.Thread.join(self)
return self.result

Yükleniyor…
İptal
Kaydet