# coding:utf-8 import os import re import sys import traceback import syslog from datetime import * from time import * import time import codecs import MeCab import pymysql.cursors import email from email import header import subprocess import shlex sys.stdout = codecs.getwriter('utf_8')(sys.stdout) debug = 0 SETTINGS_PATH = '/var/local/mode2/mode2_mailanalysis_settings' class KeywordRetriever: def __init__(self): self.vocabularies = set() self.wordappear = {} self.blackwords = ['年', '月', '日', '~', 'Forwarded', 'Message', 'Subject', 'Date', 'From', 'To', 'TEL', 'FAX', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec', 'Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'jp', 'at', 'pp', 'org', 'AM', 'PM', 'URL', 'Web', 'WEB', 'JP'] def countWord(self, data, dic): if dic == 1: option = "-Ochasen -r /etc/mecabrc -u /usr/local/lib/mecab/dic/neologd/custom.dic" else: option = "-Ochasen -r /etc/mecabrc" l = 0 for line in data.split('\n'): if line.strip() != '': line = format_text(line) cmd = f'/usr/bin/mecab {option}' syslog.syslog(f'cmd: {cmd}') p = subprocess.Popen(shlex.split(cmd.strip()), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate(line) node = stdout.decode() # 本文200行をMAXとする if l > 200: break l += 1 while node: word = node.split()[0] feature = node.split()[3:] if feature[0] == '名詞' and feature[1] == '固有名詞'\ and ((feature[2] != '人名' and (feature[3] != '姓' or feature[4] != '名')) or (feature[2] == '人名' and feature[3] == '一般'))\ and len(word) > 1 and word not in self.blackwords: if word in self.vocabularies: self.wordappear[word] += 1 else: self.wordappear[word] = 1 self.vocabularies.add(word) node = node.next # 3回以上出現している単語のみ取得 out = [i for i in sorted(self.wordappear.items( ), key=lambda x: x[1], reverse=True) if i[1] > 2] # 単語数が30以上の場合はMAX30でカット if len(out) > 29: out = out[:30] return out def get_settings(domain=False): # DB接続に必要なID,PASS設定を取得 if domain: domain_str = "_"+domain else: domain_str = "" setting_file = f'{SETTINGS_PATH}{domain_str}' try: if not os.path.isfile(setting_file): syslog.syslog("can't open the setting file") return False else: settings = {} settings_file = open(setting_file, 'r') settings_data = settings_file.readlines() settings_file.close() if len(settings_data) > 0: for t in settings_data: t = t.split("=") k = t[0] v = t[1].replace("\n", "") settings[str(k)] = v return settings except Exception as e: syslog.syslog(str(e)) return False def format_text(text): try: text = re.sub( r'https?:\/\/[\w/:%#\$&\?\(\)~\.=\+\-…]+', "", text) # URL text = re.sub(r'[\w\-\.+]+@[a-zA-Z0-9\-\.]+\.[a-zA-Z]+', "", text) # メールアドレス text = re.sub(r'-{2,}|={2,}', "", text) # --- === 記号線 text = re.sub(r'&[a-zA-Z0-9#]+;', "", text) # HTML特殊文字 text = re.sub(r'[ー]{2,}', "", text) # ーー 記号線 except: pass return text def spam_check(subject): try: pattern = r"\[SPAM!!\]|\[MEIWAKU!!\]" match = re.findall(pattern, subject) if len(match) > 0: return True else: return False except: return False def check_english_token(keyword_list0, keyword_list1): # キーワードリスト上位5単語が全てアルファベットの場合True result = False alphaReg = re.compile(r'^[a-zA-Z]+$') kwstr = "" if len(keyword_list0) > 4: for i in range(0, 4): kwstr += keyword_list0[i][0].decode('utf-8') if len(keyword_list1) > 4: for i in range(0, 4): kwstr += keyword_list1[i][0].decode('utf-8') if alphaReg.match(kwstr): result = True return result class MailParser(object): def __init__(self, data): """ コンストラクタで与えられたメールデータの解析を実行する """ self.files = {} self.body = "" # メッセージをパース msg = email.message_from_string(data) # タイトル取得 self.title = self._decode_header(msg.get('Subject')) # 送信者取得 self.sender = self._decode_header(msg.get('From')) # 送信者取得 self.receiver = self._decode_header(msg.get('To')) # 送信日付取得 self.date = self.get_format_date(msg.get('Date')) # MessageID取得 self.msgid = self._decode_header(msg.get('Message-ID')) # 添付ファイルを抽出 for part in msg.walk(): if part.get_content_maintype() == 'multipart': continue # ファイル名を取得 filename = part.get_filename() # ファイル名が取得できなければ本文 if not filename: if self.body == "": self.body = self.decode_body(part) def _decode_header(self, dec_target): """ メールタイトル、送信者のデコード """ decodefrag = header.decode_header(dec_target) ret = '' for frag, enc in decodefrag: if not hasattr(frag, "decode"): ret += frag continue if enc: ret += frag.decode(enc) else: ret += frag return ret def decode_body(self, part): """ メール本文のデコード """ body = '' charset = str(part.get_content_charset()) if part.is_multipart(): for payload in part.get_payload(): if payload.get_content_type() == "text/plain": charset = str(payload.get_content_charset()) if charset: body = payload.get_payload(decode=True).decode(charset, 'ignore') else: body = payload.get_payload(decode=True) else: if part.get_content_type() == "text/plain": if charset: body = part.get_payload(decode=True).decode(charset, 'ignore') else: body = part.get_payload(decode=True) elif part.get_content_type() == "text/html": if charset: body_html = part.get_payload(decode=True).decode(charset, 'ignore') else: body_html = part.get_payload(decode=True) p = re.compile(r"<[^>]*?>") body = p.sub("", body_html) return body def get_format_date(self, date_string): """ メールの日付をtimeに変換 http://www.faqs.org/rfcs/rfc2822.html "Jan" / "Feb" / "Mar" / "Apr" /"May" / "Jun" / "Jul" / "Aug" /"Sep" / "Oct" / "Nov" / "Dec" Wed, 12 Dec 2007 19:18:10 +0900 """ format_pattern = '%a, %d %b %Y %H:%M:%S' # Wed, 12 Dec 2007 19:18:10 +0900 (JST)形式 # Wed, 12 Dec 2007 19:18:10 PDT 形式 # timezone stringを取り除く if not date_string[-1].isdigit(): tz_string = date_string.rsplit(' ', 1)[1] date_string = date_string.replace(tz_string, "").rstrip() # timezone 識別子を取り除く if date_string[-5] == '+' or date_string[-5] == '-': date_string = date_string[0:-6] # 3 Jan 2012 17:58:09という形式でくるパターンもあるので、 # 先頭が数値だったらパターンを変更 if date_string[0].isdigit(): format_pattern = '%d %b %Y %H:%M:%S' return time.strptime(date_string, format_pattern) def failure(e): exc_type, exc_obj, tb = sys.exc_info() lineno = tb.tb_lineno print(str(lineno) + ":" + str(type(e))) exit(-1) if __name__ == "__main__": try: syslog.syslog("keyword_analysis start") HALT = 0 if HALT: # if HALT flag is true exit(0) argvs = sys.argv # target user if len(argvs) < 2: exit(0) # 2番目の引数 user = argvs[1] # print(user) domain = False try: # 第3引数が存在すれば取得 if len(argvs) == 3: domain = argvs[2] except Exception as e: raise e syslog.syslog("get setting start") settings = get_settings(domain) syslog.syslog(str(settings)) if not settings: exit(0) syslog.syslog("get setting end") connection = pymysql.connect(host='localhost', user=settings['mysql_user'], password=settings['mysql_password'], db=settings['mysql_db'], charset='utf8', cursorclass=pymysql.cursors.DictCursor, ) if not connection: exit(0) # retrieve target users from db with connection.cursor() as cursor: sql = "SELECT category_id FROM mail_target_settings WHERE target_user='%s' " % ( user) cursor.execute(sql) target_category_by_user = [i['category_id'] for i in list(cursor.fetchall())] # ユーザがどのターゲットにも設定されていない場合処理終了 if len(target_category_by_user) == 0: connection.close() exit(0) # retrieve keywords from db within target category with connection.cursor() as cursor: sql = "SELECT * FROM mail_keyword_settings WHERE "\ "category_id IN (%s)" % ', '.join(str(id) for id in target_category_by_user) cursor.execute(sql) keywords = cursor.fetchall() # receive standard input proc_input = sys.stdin.read() mail = MailParser(proc_input) # spam check if spam_check(mail.title): # if mail seems spam connection.close() exit(0) # filter text by keywords target = [] categories = [] for k in keywords: has_keyword = len(re.findall(k['keyword'], mail.body)) if has_keyword > 0: target.append( dict(category_id=k['category_id'], tf=has_keyword, keyword=k['keyword'])) categories.append(k['category_id']) if len(target) == 0: # no keyword detected connection.close() exit(0) uniq_categories = list(set(categories)) mail_category = set(uniq_categories) user_category = set(target_category_by_user) matched_list = list(mail_category & user_category) if len(matched_list) == 0: connection.close() exit(0) with connection.cursor() as cursor: sql = "SELECT category_id, dic FROM mail_category_settings "\ "WHERE category_id IN (%s)" % ', '.join(str(id) for id in matched_list) cursor.execute(sql) dic_flg = cursor.fetchall() dic_list = [] matched_list0 = [] matched_list1 = [] for i in dic_flg: if i['dic'] == 0: matched_list0.append(i['category_id']) else: matched_list1.append(i['category_id']) dic_list.append(i['dic']) dic_flg = list(set(dic_list)) # count word in text data nb = KeywordRetriever() if len(dic_flg) == 2: keyword_list0 = nb.countWord(mail.body, 0) keyword_list1 = nb.countWord(mail.body, 1) else: # dic_flgが1種類 if dic_flg[0] == 0: keyword_list0 = nb.countWord(mail.body, 0) keyword_list1 = [] else: keyword_list0 = [] keyword_list1 = nb.countWord(mail.body, 1) # check English mail if check_english_token(keyword_list0, keyword_list1): connection.close() exit(0) insert_data = {} insert_data_info = {} insert_data['msg_id'] = insert_data_info['msg_id'] = mail.msgid.replace( "<", "").replace(">", "") insert_data_info['msg_date'] = mail.date r = re.compile( "([a-zA-Z0-9])+([a-zA-Z0-9\._-])*@([a-zA-Z0-9_-])+([a-zA-Z0-9\._-]+)") m = r.search(mail.sender) if (m is not None): insert_data_info['mailfrom'] = m.group() else: insert_data_info['mailfrom'] = mail.sender r = re.compile( "([a-zA-Z0-9])+([a-zA-Z0-9\._-])*@([a-zA-Z0-9_-])+([a-zA-Z0-9\._-]+)") m = r.search(mail.receiver) if (m is not None): insert_data_info['mailto'] = m.group() else: insert_data_info['mailto'] = mail.receiver insert_data_info['subject'] = mail.title # 関連キーワードを挿入 for keyword in keyword_list0: insert_data['tf'] = keyword[1] # insert_data['keyword'] = keyword[0].decode('utf-8') insert_data['keyword'] = keyword[0] dt_now = datetime.now() insert_data['updated'] = dt_now insert_data_info['updated'] = dt_now # 出現したカテゴリ分データを挿入 for cate in matched_list0: insert_data['category_id'] = cate try: # Insert処理 with connection.cursor() as cursor: cols = ', '.join(insert_data.keys()) vals = list(insert_data.values()) pholder = ', '.join(['%s'] * len(vals)) sql = "INSERT INTO mail_message_keyword (%s) VALUES (%s)" % ( cols, pholder) r = cursor.execute(sql, vals) # print(r) # -> 1 connection.commit() except Exception as e: raise e # 追加辞書有り for keyword in keyword_list1: insert_data['tf'] = keyword[1] # insert_data['keyword'] = keyword[0].decode('utf-8') insert_data['keyword'] = keyword[0] dt_now = datetime.now() insert_data['updated'] = dt_now insert_data_info['updated'] = dt_now # 出現したカテゴリ分データを挿入 for cate in matched_list1: insert_data['category_id'] = cate try: # Insert処理 with connection.cursor() as cursor: cols = ', '.join(insert_data.keys()) vals = list(insert_data.values()) pholder = ', '.join(['%s'] * len(vals)) sql = "INSERT INTO mail_message_keyword (%s) VALUES (%s)" % ( cols, pholder) r = cursor.execute(sql, vals) # print(r) # -> 1 connection.commit() except Exception as e: raise e # 明示的に設定されたキーワードを挿入 for keyword in target: insert_data['tf'] = keyword['tf'] insert_data['category_id'] = keyword['category_id'] insert_data['keyword'] = keyword['keyword'] try: # Insert処理 with connection.cursor() as cursor: cols = ', '.join(insert_data.keys()) vals = list(insert_data.values()) pholder = ', '.join(['%s'] * len(vals)) sql = "INSERT INTO mail_message_keyword (%s) VALUES (%s)" % ( cols, pholder) r = cursor.execute(sql, vals) # print(r) # -> 1 connection.commit() except Exception as e: raise e # キーワードが出現したメールのメッセージメタ情報 try: with connection.cursor() as cursor: cols = ', '.join(insert_data_info.keys()) vals = list(insert_data_info.values()) pholder = ', '.join(['%s'] * len(vals)) sql = "INSERT INTO mail_mail_info (%s) VALUES (%s)" % ( cols, pholder) r = cursor.execute(sql, vals) # print(r) # -> 1 connection.commit() except Exception as e: raise e connection.close() except Exception as e: syslog.syslog(str(traceback(e))) # if debug: failure(e) # pass