feat: editor template

This commit is contained in:
anjingyu 2025-07-15 14:50:16 +08:00
parent 9547759ab3
commit 88531ed44d
15 changed files with 2455 additions and 1 deletions

View File

@ -17,6 +17,6 @@ build-backend = "poetry.core.masonry.api"
[[tool.poetry.source]]
name = "tsinghua"
url = "https://opentuna.cn/pypi/web/simple"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
priority = "default"

6
rule-editor/README.md Normal file
View File

@ -0,0 +1,6 @@
# Rule Script Editor
A simple rule script editor based on `Tkinter` and `Python3`.
Reference: https://cadernodelaboratorio.com.br/en/class-18-syntax-highlighting-with-pygments/

26
rule-editor/main.py Normal file
View File

@ -0,0 +1,26 @@
#!/usr/bin/env python3
# -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# Version 2, December 2004
#
# Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
#
# Everyone is permitted to copy and distribute verbatim or modified
# copies of this license document, and changing it is allowed as long
# as the name is changed.
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
#
# 0. You just DO WHAT THE FUCK YOU WANT TO.
#
# Copyright (c) 2022- donkey <anjingyu_ws@foxmail.com>
from rseditor.app import App
if __name__ == "__main__":
app = App()
app.mainloop()

View File

@ -0,0 +1,14 @@
# For Mainland, the TSingHua pip source is recommended.
# If you want to use the official source,
# please comment or remove the following line.
--index-url "https://pypi.tuna.tsinghua.edu.cn/simple"
ttkbootstrap
ttkwidgets
matplotlib
pygments
requests
chlorophyll
# Installation helpers
# pyinstaller
# pycrypto
# tinyaes

View File

446
rule-editor/rseditor/app.py Normal file
View File

@ -0,0 +1,446 @@
#!/usr/bin/env python3
# -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# Version 2, December 2004
#
# Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
#
# Everyone is permitted to copy and distribute verbatim or modified
# copies of this license document, and changing it is allowed as long
# as the name is changed.
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
#
# 0. You just DO WHAT THE FUCK YOU WANT TO.
#
# Copyright (c) 2022- donkey <anjingyu_ws@foxmail.com>
import json
import platform
import tkinter as tk
import tkinter.filedialog
import ttkbootstrap as ttk
from ttkbootstrap.constants import *
from ttkbootstrap.localization import MessageCatalog
from ttkwidgets import CheckboxTreeview, DebugWindow
from .rule_code_view import RuleCodeView
from .status_bar import StatusBar
from .rule_service import RuleService, RuleCreateDialog
from .rule_tester import RuleTester
from .rule_options import RuleOptionsDialog
__all__ = ["App"]
__version__ = ".".join(map(lambda x: str(x), [1, 0, 0]))
class App(ttk.Window):
TITLE = "Rule Editor"
THEME = "pulse"
DEFAULT_WIDTH = 800
DEFAULT_HEIGHT = 660
def __init__(self):
super().__init__(title=App.TITLE, themename=App.THEME)
MessageCatalog.locale("en_us")
w = App.DEFAULT_WIDTH
h = App.DEFAULT_HEIGHT
self.__resize_center(w, h)
self.minsize(w, h)
self.protocol("WM_DELETE_WINDOW", self.__quit)
self.__theme_var = ttk.StringVar()
self.__theme_var.set(App.THEME)
self.__style = ttk.Style()
self.__debug = ttk.BooleanVar()
# Update font for Linux
self.__change_font()
self.__rules = {}
self.__code_views = {}
self.__rs = None
self.__setup_ui()
def __change_font(self):
if platform.system() == "Linux":
self.__style.configure("TButton", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TLabel", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TLabelframe", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TMenubutton", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TFrame", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TNotebook", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TTreeview", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TScrollbar", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TCombobox", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TEntry", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TCheckbutton", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TRadiobutton", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TScale", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TSpinbox", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("Table.Treeview", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("Treeview", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("Toolbutton", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TCalendar", font=("Noto Sans Mono CJK SC", 14))
self.__style.configure("TNotebook.Tab", font=("Noto Sans Mono CJK SC", 14))
def __resize_center(self, w, h):
# get screen width and height
ws = self.winfo_screenwidth()
hs = self.winfo_screenheight()
# calculate position x, y
x = int((ws / 2) - (w / 2))
y = int((hs / 2) - (h / 2))
self.geometry(f"{w}x{h}+{x}+{y}")
def __quit(self):
self.destroy()
self.quit()
def __append_treeview(self, ft: dict):
for name, vs in ft.items():
self.tv.insert("", END, iid=name, text=name)
if not isinstance(vs, dict):
continue
for val in vs:
vname = "{}.{}".format(name, val)
self.tv.insert(name, END, iid=vname, text=val)
def __remove_treeview(self):
for item in self.tv.get_children():
self.tv.delete(item)
def __enable_debug(self):
# Open the debug view
pass
def __change_theme(self):
self.__style.theme_use(self.__theme_var.get())
def __about(self):
ttk.dialogs.Messagebox.ok(
message=f"© 2022- donkey <anjingyu_ws@foxmail.com>. All rights reserved.\nVersion: {__version__}",
title="About",
)
def __setup_menu(self):
self.__menu_bar = ttk.Menu(self)
self.config(menu=self.__menu_bar)
file_menu = ttk.Menu(self.__menu_bar, tearoff=0)
file_menu.add_command(label="Add Rules", command=self.__add_rule)
file_menu.add_checkbutton(
label="Debug",
onvalue=True,
offvalue=False,
variable=self.__debug,
command=self.__enable_debug,
)
file_menu.add_command(label="Quit", command=self.__quit)
theme_menu = ttk.Menu(self.__menu_bar, tearoff=0)
for theme in self.__style.theme_names():
theme_menu.add_radiobutton(
label=theme,
value=theme,
variable=self.__theme_var,
command=self.__change_theme,
)
help_menu = ttk.Menu(self.__menu_bar, tearoff=0)
help_menu.add_command(label="About", command=self.__about)
self.__menu_bar.add_cascade(label="File", menu=file_menu)
self.__menu_bar.add_cascade(label="Theme", menu=theme_menu)
self.__menu_bar.add_cascade(label="Help", menu=help_menu)
self.winfo_toplevel().bind("<Control-q>", lambda event: self.__quit)
def __add_rule(self):
def _create(cname: str, ft: int):
category, name = cname.split(".")
self.__rules[category][name] = {
"obj": {"name": "", "type": "script", "value": ""},
"id": [],
"formulaType": category,
"functionTag": ft,
"formulaRuleName": name,
"curveDisplayKey": None,
"ruleDesc": "",
}
self.tv.insert(category, END, iid=cname, text=name)
rcd = RuleCreateDialog(
"Create a new rule/expression and bind to a drive system.",
"Create New Rule/Expr",
_create,
self,
)
rcd.show()
def __update_rule(self, new_rule_name: str, obj: dict):
type_name = obj["formulaType"]
old_name = obj["formulaRuleName"]
niid = f"{type_name}.{new_rule_name}"
if old_name != new_rule_name:
self.__rules[type_name][new_rule_name] = obj
if old_name in self.__rules[type_name]:
del self.__rules[type_name][old_name]
self.__rules[type_name][new_rule_name]["obj"]["name"] = new_rule_name
self.__rules[type_name][new_rule_name]["formulaRuleName"] = new_rule_name
self.__remove_treeview()
self.__append_treeview(self.__rules)
iid = f"{type_name}.{old_name}"
if iid in self.__code_views:
try:
self.rframe.forget(self.__code_views[iid][0])
except Exception:
try:
print("index:", self.rframe.index(self.__code_views[iid][0]))
except Exception:
print("[DEBUG] Failed to get the index")
del self.__code_views[iid]
self.__update_tab_page(niid)
elif niid in self.__code_views:
self.__code_views[niid][1].update(
self.__rules[type_name][new_rule_name]["obj"]
)
def __delete_rule(self):
rs = self.rs()
if not rs:
return
selected = self.tv.get_checked()
if selected:
ret = ttk.dialogs.Messagebox.yesno(
message="Are you sure you want to delete these rules or expressions?",
title="Delete Entries",
)
if not ret or ret.lower() != "yes":
return
for sel in selected:
category, name = sel.split(".")
ids = self.__rules[category][name]["id"]
rs.delete_rules(ids)
del self.__rules[category][name]
self.tv.delete(sel)
if sel in self.__code_views:
code_frame = self.__code_views[sel][0]
del self.__code_views[sel]
self.rframe.forget(code_frame)
def __do_test(self, file_path, log_type):
selected = self.tv.get_checked()
if not selected:
ttk.dialogs.Messagebox.show_error(
message="Please select at least one rule or expression that you want to test.",
title="No Entry Selected",
)
return
rt = RuleTester(self.__debug.get(), self.__debug.get())
# construct rule list
rule_list = []
for sel in selected:
category, name = sel.split(".")
obj = self.__rules[category][name]["obj"]
if sel in self.__code_views:
code_editor = self.__code_views[sel][1]
# update the value and mapping
code_editor.update_obj(obj)
rule_list.append(obj)
DebugWindow(self, stderr=True)
self.update()
self.after(500, lambda: rt.test(file_path, rule_list, True))
def __upload(self):
rs = self.rs()
if not rs:
return
selected = self.tv.get_checked()
if selected:
for sel in selected:
category, name = sel.split(".")
code_editor = self.__code_views[sel][1]
rule = self.__rules[category][name]
obj = rule["obj"]
ids = rule["id"]
# update the value and mapping
code_editor.update_obj(obj)
if ids:
for mid in ids:
data = {
"id": mid,
"formulaRules": json.dumps(obj),
"formulaType": category,
"functionTag": rule["functionTag"],
"curveDisplayKey": rule["curveDisplayKey"],
}
rs.push_rule(data)
else:
item = {
"formulaRules": json.dumps(obj),
"formulaType": category,
"functionTag": rule["functionTag"],
"curveDisplayKey": rule["curveDisplayKey"],
}
rs.add_rule(item)
self.__statusbar.show_info(f"Rules update successfully")
else:
ttk.dialogs.Messagebox.show_warning(
message=f"Please select at least one rule or expression.",
title="No Entry Selected",
)
def __close_tab(self, event):
if event.widget.identify(event.x, event.y) == "label":
index = event.widget.index(f"@{event.x},{event.y}")
iid = event.widget.tab(index, "text")
event.widget.forget(index)
# Save data into object
category, name = iid.split(".")
self.__code_views[iid][1].update_obj(self.__rules[category][name]["obj"])
def __update_tab_page(self, iid: str):
if "." in iid:
category, name = iid.split(".")
self.__statusbar.show_info(f"{category.capitalize()} [{name}] is selected")
if iid not in self.__code_views:
code_frame = ttk.Frame(self.rframe)
code_editor = RuleCodeView(code_frame)
# Append content
code_editor.update(self.__rules[category][name]["obj"])
self.rframe.add(code_frame, text=iid)
code_editor.pack(fill=BOTH, expand=True)
self.rframe.select(code_frame)
self.__code_views[iid] = (code_frame, code_editor)
else: # Switch this tab
try:
self.rframe.index(self.__code_views[iid][0])
self.rframe.select(self.__code_views[iid][0])
except Exception:
self.rframe.add(self.__code_views[iid][0], text=iid)
self.__code_views[iid][1].update(self.__rules[category][name]["obj"])
else:
self.__statusbar.show_info("")
def __select_item(self, event):
assert event.widget == self.tv
iid = event.widget.identify_row(event.y)
self.__update_tab_page(iid)
# The event handler of TreeView Context Menu
def __show_menu(self, event):
iid = self.tv.identify_row(event.y)
if "." in iid:
self.tv.selection_set(iid)
self.tv_ctx_menu.post(event.x_root, event.y_root)
def __rule_options(self):
if self.tv.selection():
iid = self.tv.selection()[0]
if "." in iid:
catagory, name = iid.split(".")
opt_dialog = RuleOptionsDialog(
self.__rules[catagory][name],
self,
fn=self.__update_rule,
debug=self.__debug.get(),
)
opt_dialog.show()
def __setup_ui(self):
self.__setup_menu()
self.main_frame = ttk.Frame(self, relief=RAISED, borderwidth=1)
self.main_frame.pack(fill=BOTH, expand=True)
self.main_frame.columnconfigure(0, pad=7)
self.main_frame.columnconfigure(1, pad=7, weight=1)
self.main_frame.rowconfigure(0, pad=7, weight=1)
self.main_frame.rowconfigure(1, pad=1)
self.main_frame.rowconfigure(2, pad=0)
self.lframe = ttk.Labelframe(self.main_frame, text="Messages", bootstyle="info")
self.tv = CheckboxTreeview(master=self.lframe, show=TREE, selectmode=EXTENDED)
# Update the row height on Linux
if platform.system() == "Linux":
style = ttk.Style()
style.configure(
"Custom.Treeview",
font=("Noto Sans Mono CJK SC", 14, "normal"),
rowheight=20,
)
self.tv.configure(style="Custom.Treeview")
# Define the context menu for the rule entries
self.tv_ctx_menu = tk.Menu(self, tearoff=0)
self.tv_ctx_menu.add_command(label="Options ...", command=self.__rule_options)
self.tv.bind("<Button-3>", self.__show_menu)
self.tv.bind("<Double-Button-1>", self.__select_item)
self.tv.column("#0", width=240, stretch=YES)
self.tv.pack(side=LEFT, anchor=NE, fill=BOTH)
self.lframe.grid(row=0, column=0, padx=1, sticky=NSEW)
self.rframe = ttk.Notebook(self.main_frame)
self.rframe.bind("<ButtonRelease-3>", self.__close_tab)
self.rframe.grid(row=0, column=1, padx=1, sticky=NSEW)
self._btn_frame = ttk.Frame(master=self.main_frame)
self._btn_frame.grid(row=1, column=0, columnspan=2, padx=1, pady=1, sticky=NSEW)
self.__upload_btn = ttk.Button(
self._btn_frame,
text="Upload",
bootstyle="success-outline",
command=self.__upload,
)
self.__upload_btn.pack(side=RIGHT, padx=5, pady=5)
self.__delete_btn = ttk.Button(
self._btn_frame,
text="Delete",
style="danger-outline",
command=self.__delete_rule,
)
self.__delete_btn.pack(side=LEFT, padx=5, pady=5)
self.__statusbar = StatusBar(self.main_frame)
self.__statusbar.grid(
row=2, column=0, columnspan=2, padx=0, pady=0, sticky=NSEW
)

View File

@ -0,0 +1,83 @@
#!/usr/bin/env python3
# -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# Version 2, December 2004
#
# Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
#
# Everyone is permitted to copy and distribute verbatim or modified
# copies of this license document, and changing it is allowed as long
# as the name is changed.
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
#
# 0. You just DO WHAT THE FUCK YOU WANT TO.
#
# Copyright (c) 2020- anjingyu <anjingyu_ws@foxmail.com>
import json
import traceback
import pygments
import pygments.lexers
import ttkbootstrap as ttk
from ttkbootstrap.constants import *
from chlorophyll import CodeView
class RuleCodeView(ttk.Frame):
def __init__(self, master, **kwargs):
super().__init__(master, **kwargs)
self.__pane = ttk.PanedWindow(master, orient=VERTICAL)
self.__pane.pack(fill=BOTH, expand=True)
self.__codef = ttk.Frame(self.__pane)
self.__mappingf = ttk.Frame(self.__pane)
self.__code = CodeView(self.__codef, pygments.lexers.PythonLexer, "dracula")
self.__mapping = CodeView(self.__mappingf, pygments.lexers.JsonLexer, "dracula")
self.__code.pack(fill=BOTH, expand=True)
self.__mapping.pack(fill=BOTH, expand=True)
self.__codef.pack(side=TOP, fill=BOTH, expand=True)
self.__mappingf.pack(side=TOP, fill=BOTH, expand=True)
self.__pane.add(self.__codef)
self.__pane.add(self.__mappingf)
def update(self, rule_obj: dict):
self.__code.delete("1.0", END)
self.__code.insert(END, rule_obj["value"])
if "mapping" in rule_obj and rule_obj["mapping"]:
self.__mapping.delete("1.0", END)
self.__mapping.insert(END, json.dumps(rule_obj["mapping"], indent=4))
def update_obj(self, rule_obj: dict):
code_ret = []
code = self.__code.get("1.0", END)
for line in code.strip().splitlines():
ls = line.strip()
if not ls:
continue
if ls.startswith("#"):
continue
code_ret.append(line)
rule_obj["value"] = "\n".join(code_ret)
mapping = self.__mapping.get("1.0", END)
if mapping.strip():
try:
mapping_obj = json.loads(mapping)
rule_obj["mapping"] = mapping_obj
except Exception:
traceback.print_exc()

View File

@ -0,0 +1,267 @@
#!/usr/bin/env python3
# -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# Version 2, December 2004
#
# Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
#
# Everyone is permitted to copy and distribute verbatim or modified
# copies of this license document, and changing it is allowed as long
# as the name is changed.
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
#
# 0. You just DO WHAT THE FUCK YOU WANT TO.
#
# Copyright (c) 2022- donkey <anjingyu_ws@foxmail.com>
import json
import copy
import traceback
import tkinter
from typing import Callable
import pygments
import pygments.lexers
import ttkbootstrap as ttk
from ttkbootstrap.constants import *
from ttkbootstrap.dialogs.dialogs import Dialog
from chlorophyll import CodeView
from .rule_service import RuleService
__author__ = ['"anjingyu" <anjingyu_ws@foxmail.com>']
__all__ = ["RuleOptionsDialog"]
class RuleOptionsDialog(Dialog):
def __init__(
self,
rule_obj: dict,
parent: tkinter.Misc = None,
fn: Callable = lambda name, obj: None,
debug: bool = False,
):
super().__init__(
parent,
f"{rule_obj['formulaRuleName']} ({rule_obj['formulaType']}) Properties",
)
self._obj = rule_obj
self._width = 400
self._padding = (20, 5)
self._tag_selected = -1
self._function_tags = copy.deepcopy(RuleService.function_tag_cache())
self._fn = fn
self._new_rule_name = ""
self.__debug = debug
self.__uis = {}
def _change_function_tag(self, e):
self._tag_selected = self._function_tags[e.widget.get()]
def create_body(self, master):
self.nb = ttk.Notebook(master)
frame = ttk.Frame(master=self.nb, padding=self._padding)
frame.columnconfigure(0, weight=3)
frame.columnconfigure(1, weight=7)
# Name
ttk.Label(master=frame, text="Name:").grid(
row=0, column=0, sticky=W, padx=5, pady=5
)
self.__uis["name"] = ttk.Entry(master=frame)
self.__uis["name"].grid(row=0, column=1, padx=5, pady=5)
self.__uis["name"].insert(0, self._obj["formulaRuleName"])
self._new_rule_name = self._obj["formulaRuleName"]
# Function tag
ttk.Label(master=frame, text="FunctionTag:", anchor=W).grid(
row=1, column=0, sticky=W, padx=5, pady=5
)
fts_keys = list(self._function_tags.keys())
tag_combobox = ttk.Combobox(master=frame, values=fts_keys, state=READONLY)
tag = 0
for _, idx in self._function_tags.items():
if idx == self._obj["functionTag"]:
self._tag_selected = tag
tag_combobox.current(tag)
break
tag += 1
tag_combobox.bind("<<ComboboxSelected>>", self._change_function_tag)
self.__uis["function_tag"] = tag_combobox
self.__uis["function_tag"].grid(row=1, column=1, padx=5, pady=5)
# Formula type
ttk.Label(master=frame, text="FormulaType:", anchor=W).grid(
row=2, column=0, sticky=W, padx=5, pady=5
)
assert (
self._obj["formulaType"] in RuleService.TV_LIST
), f"Invalid formula type: {self._obj['formulaType']}, options: {RuleService.TV_LIST}"
self.__uis["formula_type"] = ttk.Label(
master=frame, text=self._obj["formulaType"], anchor=W
)
self.__uis["formula_type"].grid(row=2, column=1, padx=5, pady=5)
ttk.Label(master=frame, text="Description:", anchor=W).grid(
row=3, column=0, sticky=W, padx=5, pady=5
)
self.__uis["description"] = ttk.Text(master=frame, height=3, wrap=WORD)
self.__uis["description"].grid(row=3, column=1, padx=5, pady=5)
if "ruleDesc" in self._obj and self._obj["ruleDesc"]:
self.__uis["description"].insert("1.0", self._obj["ruleDesc"])
else:
self.__uis["description"].insert("1.0", "")
frame.pack(fill=X, padx=5, pady=5)
rule_obj = self._obj["obj"]
code_frame = ttk.Frame(master=self.nb)
self.__uis["code_cv"] = CodeView(
code_frame, pygments.lexers.PythonLexer, "dracula"
)
self.__uis["code_cv"].pack(fill=BOTH, expand=True)
code_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
self.__uis["code_cv"].delete("1.0", END)
self.__uis["code_cv"].insert(END, rule_obj["value"])
mapping_frame = ttk.Frame(master=self.nb)
self.__uis["mapping_cv"] = CodeView(
mapping_frame, pygments.lexers.JsonLexer, "dracula"
)
self.__uis["mapping_cv"].pack(fill=BOTH, expand=True)
mapping_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
if "mapping" in rule_obj:
self.__uis["mapping_cv"].delete("1.0", END)
self.__uis["mapping_cv"].insert(
END, json.dumps(rule_obj["mapping"], indent=4)
)
curve_display_frame = ttk.Frame(master=self.nb)
self.__uis["curve_display_cv"] = CodeView(
curve_display_frame, pygments.lexers.JsonLexer, "dracula"
)
self.__uis["curve_display_cv"].pack(fill=BOTH, expand=True)
curve_display_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
if self._obj["curveDisplayKey"]:
self.__uis["curve_display_cv"].delete("1.0", END)
self.__uis["curve_display_cv"].insert(
END, json.dumps(json.loads(self._obj["curveDisplayKey"]), indent=4)
)
self.nb.add(frame, text="Basic")
self.nb.add(code_frame, text="Code")
self.nb.add(mapping_frame, text="Mapping")
self.nb.add(curve_display_frame, text="CurveDisplay")
self.nb.pack(expand=True, fill=BOTH, padx=5, pady=5)
def create_buttonbox(self, master):
"""Overrides the parent method; adds the message buttonbox"""
frame = ttk.Frame(master, padding=(5, 10))
submit = ttk.Button(
master=frame,
style="primary",
text="Submit",
command=self.on_submit,
)
submit.pack(padx=5, side=RIGHT)
submit.lower() # set focus traversal left-to-right
cancel = ttk.Button(
master=frame,
style="secondary",
text="Cancel",
command=self.on_cancel,
)
cancel.pack(padx=5, side=RIGHT)
cancel.lower() # set focus traversal left-to-right
ttk.Separator(self._toplevel).pack(fill=X)
frame.pack(side=BOTTOM, fill=X, anchor=S)
def on_submit(self, *_):
"""Save result, destroy the toplevel, and apply any post-hoc
data manipulations."""
valid_result = self.validate()
if not valid_result:
return # keep toplevel open for valid response
self._toplevel.destroy()
self.apply()
def on_cancel(self, *_):
"""Close the toplevel and return empty."""
self._toplevel.destroy()
def validate(self):
rule_obj = self._obj["obj"]
name = self.__uis["name"].get().strip()
if not name:
ttk.dialogs.Messagebox.show_warning(
message="Name MUST be not empty!", title="Invalid values"
)
return False
self._new_rule_name = name
if self._tag_selected >= 0:
self._obj["functionTag"] = self._tag_selected
self._obj["ruleDesc"] = self.__uis["description"].get("1.0", END).strip()
code = self.__uis["code_cv"].get("1.0", END)
code_ret = []
for line in code.strip().splitlines():
ls = line.strip()
if not ls:
continue
if ls.startswith("#"):
continue
code_ret.append(line)
rule_obj["value"] = "\n".join(code_ret)
mapping = self.__uis["mapping_cv"].get("1.0", END)
if mapping.strip():
try:
mapping_obj = json.loads(mapping)
rule_obj["mapping"] = mapping_obj
except Exception:
traceback.print_exc()
ttk.dialogs.Messagebox.show_warning(
message="The content of `Mapping` MUST be JSON!",
title="Invalid values",
)
return False
curve_display = self.__uis["curve_display_cv"].get("1.0", END)
if curve_display.strip():
try:
_ = json.loads(curve_display)
self._obj["curveDisplayKey"] = curve_display
except Exception:
traceback.print_exc()
ttk.dialogs.Messagebox.show_warning(
message="The content of `CurveDisplay` MUST be JSON!",
title="Invalid values",
)
return False
return True
def apply(self):
self._fn(self._new_rule_name, self._obj)

View File

@ -0,0 +1,154 @@
#!/usr/bin/env python3
# -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# Version 2, December 2004
#
# Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
#
# Everyone is permitted to copy and distribute verbatim or modified
# copies of this license document, and changing it is allowed as long
# as the name is changed.
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
#
# 0. You just DO WHAT THE FUCK YOU WANT TO.
#
# Copyright (c) 2022- donkey <anjingyu_ws@foxmail.com>
import os
import json
import requests
import textwrap
import traceback
from collections import OrderedDict
from typing import Callable
import ttkbootstrap as ttk
from ttkbootstrap.constants import *
from ttkbootstrap.dialogs.dialogs import Dialog
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# The rules format:
#
# [
# {
# "id": <int>,
# "formulaRules": "",
# "formulaType": "script or expression",
# "curveDisplayKey": "[...]",
# "functionTag": 1
# },
# ...
# ]
class RuleCreateDialog(Dialog):
def __init__(
self,
prompt="",
title="",
fn=None,
parent=None,
width=65,
datatype=str,
padding=(20, 20),
debug: bool = False,
):
super().__init__(parent, title)
self._prompt = prompt
self._width = width
self._datatype = datatype
self._padding = padding
self._ft_selected = ""
self._checkbox_val = ttk.BooleanVar()
self._checkbox_val.set(False)
self._result = None
self.__debug = debug
self.__fn = fn if fn else lambda name, ft: None
rs = RuleService(debug)
ret, msg = rs.login()
if not ret:
ttk.dialogs.Messagebox.show_warning(message=msg, title="Failed to login")
return
# Request function tags
self._function_tag_dict = rs.list_function_tag()
def create_body(self, master):
frame = ttk.Frame(master, padding=self._padding)
if self._prompt:
for p in self._prompt.split("\n"):
prompt = "\n".join(textwrap.wrap(p, width=self._width))
prompt_label = ttk.Label(frame, text=prompt)
prompt_label.pack(pady=(0, 5), fill=X, anchor=N)
entry = ttk.Entry(master=frame)
entry.pack(pady=(0, 5), fill=X)
entry.bind("<Return>", self.on_submit)
entry.bind("<KP_Enter>", self.on_submit)
entry.bind("<Escape>", self.on_cancel)
if self._function_tag_dict:
keys = list(self._function_tag_dict.keys())
combo_label = ttk.Label(frame, text="FunctionTag:")
combobox = ttk.Combobox(master=frame, values=keys, state=READONLY)
combo_label.pack(pady=(0, 5), side=LEFT)
combobox.pack(padx=(5, 0), pady=(0, 5), side=LEFT)
combobox.current(0)
combobox.bind("<<ComboboxSelected>>", self._change_tag)
self._ft_selected = keys[0]
chkbox = ttk.Checkbutton(frame, text="Expression", variable=self._checkbox_val)
chkbox.pack(padx=(5, 0), pady=(0, 5), side=LEFT)
frame.pack(fill=X, expand=True)
self._initial_focus = entry
def _change_tag(self, e):
self._ft_selected = e.widget.get()
def create_buttonbox(self, master):
"""Overrides the parent method; adds the message buttonbox"""
frame = ttk.Frame(master, padding=(5, 10))
submit = ttk.Button(
master=frame,
style="primary",
text="Submit",
command=self.on_submit,
)
submit.pack(padx=5, side=RIGHT)
submit.lower() # set focus traversal left-to-right
cancel = ttk.Button(
master=frame,
style="secondary",
text="Cancel",
command=self.on_cancel,
)
cancel.pack(padx=5, side=RIGHT)
cancel.lower() # set focus traversal left-to-right
ttk.Separator(self._toplevel).pack(fill=X)
frame.pack(side=BOTTOM, fill=X, anchor=S)
def on_submit(self, *_):
self._result = self._initial_focus.get().strip()
if not self._result:
return
self._toplevel.destroy()
self.apply()
def on_cancel(self, *_):
self._toplevel.destroy()
def apply(self):
name = "{}.{}".format(
"expression" if self._checkbox_val.get() else "rule", self._result
)
self.__fn(name, self._function_tag_dict[self._ft_selected])

View File

@ -0,0 +1,277 @@
#!/usr/bin/env python3
# -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# Version 2, December 2004
#
# Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
#
# Everyone is permitted to copy and distribute verbatim or modified
# copies of this license document, and changing it is allowed as long
# as the name is changed.
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
#
# 0. You just DO WHAT THE FUCK YOU WANT TO.
#
# Copyright (c) 2020- donkey <anjingyu_ws@foxmail.com>
import os
import sys
import json
import time
import threading
import queue
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.abspath(os.path.join(SCRIPT_DIR, "..", "..", "..", "service")))
from nsse.script_engine import ScriptEngine, ExpressionEngine, ExpressionEngineHelper
from nsse.result_submitter import ResultSubmitter
class RealtimeData:
def __init__(self, debug: bool = False, default_pkg_frame_num: int = 10):
self.__default_pkg_frame_num = default_pkg_frame_num
self.__realtime_queue = queue.Queue()
self.__running = False
self.__thread = None
self.__count = 0
self.__debug = debug
def __run(self):
while self.__running:
if self.__realtime_queue.qsize() < self.__default_pkg_frame_num:
# Sleep 50 ms if there is not data
time.sleep(0.05)
continue
# Pack the 10 data frames into one package
i = 0
data = {}
while (
not self.__realtime_queue.empty() and i < self.__default_pkg_frame_num
):
item = self.__realtime_queue.get()
for k, v in item.items():
for key, val in v.items():
tkey = f"{k}:{key}"
if tkey not in data:
data[tkey] = []
data[tkey].append(val)
i += 1
self.__count += 1
if self.__debug and data:
print(json.dumps(data, indent=2))
if not self.__realtime_queue.empty():
data = {}
while not self.__realtime_queue.empty():
item = self.__realtime_queue.get()
for k, v in item.items():
for key, val in v.items():
tkey = f"{k}:{key}"
if tkey not in data:
data[tkey] = []
data[tkey].append(val)
self.__count += 1
if data:
print("Realtime Data:")
print(json.dumps(data, indent=2))
print(f"Upload {self.__count} frames")
def startup(self):
if not self.__running:
self.__running = True
self.__thread = threading.Thread(target=RealtimeData.__run, args=(self,))
self.__thread.start()
def shutdown(self):
if self.__running:
self.__running = False
if self.__thread:
self.__thread.join()
self.__thread = None
def put(self, item):
self.__realtime_queue.put(item)
class RuleTester:
def __init__(
self, debug: bool = False, test_realtime: bool = False, wait_time: float = 5.0
):
self.__debug = debug
self.__test_realtime = test_realtime
self.__wait_time = wait_time
self.__submitter = ResultSubmitter()
def test(self, dfpath: str, rules: list, in_thread: bool = False):
if not os.path.isfile(dfpath):
print(f"[ERROR] Dataframe file does not exist: {dfpath}")
return
if in_thread:
t = threading.Thread(
target=RuleTester.__do_test, args=(self, dfpath, rules)
)
t.start()
else:
self.__do_test(dfpath, rules)
def __do_expr(self, exprs: list, rules_output: dict, strict: bool = True) -> tuple:
expr_mapping_tables = {}
exprs_output = {}
error_msg = {}
ee = ExpressionEngine(debug=self.__debug)
for expr in exprs:
(
succ,
error_msg,
mapping_table,
) = ExpressionEngineHelper.construct_mapping_table(
expr, rules_output, strict
)
if not succ:
if strict:
error_msg[
"expression"
] = f"Failed to generate mapping table for {expr['name']}"
return (not error_msg), error_msg, exprs_output, expr_mapping_tables
else:
continue
succ, emsg, ret = ee.execute(expr, mapping_table)
if succ:
exprs_output[expr["name"]] = ret
expr_mapping_tables[expr["name"]] = mapping_table
elif strict:
error_msg["expression"] = {expr["name"]: emsg}
return (not error_msg), error_msg, exprs_output, expr_mapping_tables
return (not error_msg), error_msg, exprs_output, expr_mapping_tables
def __do_test(self, dfpath: str, rules: list):
start_time = time.time()
rd = None
if self.__test_realtime:
rd = RealtimeData()
rd.startup()
data = json.loads(open(dfpath).read())
dfs = data["result"]["data"]
se = ScriptEngine(debug=self.__debug)
rule_scripts = []
rule_exprs = []
for rule in rules:
if rule["type"] == "script":
rule_scripts.append(rule)
mapping = {}
if "mapping" in rule:
for k, v in rule["mapping"].items():
mapping[k] = json.loads(v)
succ, error_msg = ScriptEngine.check_script(
rule["value"], names=mapping, frame=dfs[0]
)
if not succ:
print("Check Rule:", rule["name"], error_msg)
if rd:
rd.shutdown()
return
elif rule["type"] == "expression":
rule_exprs.append(rule)
succ, error_msg = se.compile(rule_scripts)
if not succ:
print("Compiling:", error_msg)
if rd:
rd.shutdown()
return
succ, error_msg = se.do_preprocess()
if not succ:
print(error_msg)
if rd:
rd.shutdown()
return
if rd and se.get_realtime_var():
rd.put(se.get_realtime_var_copy())
count = 0
for df in dfs:
succ, error_msg = se.do_step(df)
if not succ:
print(error_msg)
if rd:
rd.shutdown()
return
if rd and se.get_realtime_var():
rd.put(se.get_realtime_var_copy())
# Check intermediate result, if there are intermediate
# results, upload immediately
if se.get_intermediate_var() and rule_exprs:
iv = se.get_intermediate_var_copy()
succ, emsg, exprs_output, emtables = self.__do_expr(
rule_exprs, iv, False
)
if not succ:
print(f"[ERROR] {json.dumps(emsg, indent=2)}")
else:
r = self.__submitter.construct_post_result(
iv, exprs_output, rule_exprs, emtables, False
)
print(f"[DEBUG] ***** Intermediate Results *****")
print(json.dumps(r, indent=2))
count += 1
if self.__test_realtime:
# Wait for the realtime data uploader
if self.__wait_time > 0:
print(
f"[DEBUG] Waiting for {self.__wait_time} seconds to wait the realtime data"
)
time.sleep(self.__wait_time)
succ, error_msg = se.do_postprocess()
if not succ:
print(error_msg)
if rd:
rd.shutdown()
return
if rd and se.get_realtime_var():
rd.put(se.get_realtime_var_copy())
if rd:
rd.shutdown()
print("Dataframe count:", count)
print(json.dumps(se.get_output_var(), indent=2))
# Expression
output_var = se.get_output_var()
succ, emsg, exprs_output, expr_mapping_tables = self.__do_expr(
rule_exprs, output_var
)
r = self.__submitter.construct_post_result(
output_var, exprs_output, rule_exprs, expr_mapping_tables, emsg
)
print(json.dumps(r, indent=2))
print("Time consumed: {} s".format(round(time.time() - start_time, 3)))

View File

@ -0,0 +1,691 @@
#!/usr/bin/env python3
# -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
#
# NavInfo Software Copyright Notice
# Version 1, March 2022
#
# Copyright (c) 2022- NavInfo Co., Ltd. All Rights Reserved.
#
# The NavInfo software and associated documentation files shall not
# be used, reproduced, distribute, redistribute or otherwise transmitted
# in part or whole, in any form or reason, in the absence of NavInfo's
# express and written consent.
#
# Unless otherwise stated by NavInfothis software is provided "AS IS".
# To the maximum extent permitted by law, NavInfo disclaims any warranty
# including implied warranty and does not assume any liability, in
# particular as to the accuracy, faultlessness, fitness for any purpose,
# freedom of any third parties rights or completeness.
#
# Contact us: NavInfo Legal Department <legal@navinfo.com>
import sys
import math
import ast
import json
import re
import copy
import traceback
__all__ = ["ScriptEngine", "ExpressionEngine", "ExpressionEngineHelper"]
__author__ = ['"anjingyu" <anjingyu@navinfo.com>']
class RuleScript:
LOG_ENABLED = True
def __init__(self):
pass
def preprocess(self):
pass
def step(self, df) -> dict:
return {}
def postprocess(self) -> dict:
return {}
def get_intermediate(self) -> dict:
return {}
def debug(self, *argv, **kwargv):
if RuleScript.LOG_ENABLED:
print(f"[DEBUG] [{self.__class__.__name__}]:", *argv, **kwargv)
def output_desc(self) -> list:
return []
def input_desc(self) -> list:
return []
class ExpressionEngineHelper:
@staticmethod
def extract_result_from_rules(
expr: dict, expr_results: dict, rule_results: dict
) -> list:
ret = []
if expr["name"] in expr_results and "mapping" in expr:
for key, val in expr["mapping"].items():
succ, error_msg, rule_info = ExpressionEngineHelper.extract_mapping_var(
val
)
if not succ:
print(f"[WARNING] Failed to extract mapping variable: {error_msg}")
continue
rule_name, rule_key = rule_info
if rule_name in rule_results:
if rule_key not in rule_results[rule_name]:
print(f"[WARNING] {rule_key} does not exist in {rule_name}")
continue
rdfs = []
if isinstance(rule_results[rule_name][rule_key], list):
rdfs.extend(rule_results[rule_name][rule_key])
else:
rdfs.append(rule_results[rule_name])
for rdf in rdfs:
rule_key_sfno = f"{rule_key}_START_ID"
rule_key_efno = f"{rule_key}_END_ID"
rule_key_ts = f"{rule_key}_START_TIME"
rule_key_desc = f"{rule_key}_DESCRIPTION"
if (
rule_key_sfno not in rdf
or rule_key_efno not in rdf
or rule_key_ts not in rdf
):
print(
f"[WARNING] {rule_key_sfno} or {rule_key_efno} or {rule_key_ts} do not exist: {rule_name}.{rule_key}"
)
continue
ret.append(
{
"name": key,
"sfno": rdf[rule_key_sfno],
"efno": rdf[rule_key_efno],
"ts": rdf[rule_key_ts],
"desc": ""
if rule_key_desc not in rdf
else rdf[rule_key_desc],
}
)
return ret
@staticmethod
def extract_mapping_var(val: str) -> tuple:
"""Extract the required rule indices from the value string of the mapping table."""
error_msg = ""
rule = ""
# <rule>:<variable>
if ":" in val:
splitted_items = val.split(":")
if len(splitted_items) > 2:
error_msg = (
"The pattern <rule>:<variable> can only contains only one `:`"
)
return (not error_msg), error_msg, None
rule = splitted_items[0]
val = splitted_items[1]
# TODO(anjingyu): To support the pattern `x.y.z` -> CurrentFrame['x']['y']['z']`
# if '.' in val:
return (not error_msg), error_msg, (rule, val)
@staticmethod
def construct_mapping_table(
expr: dict, output_vars: dict, strict: bool = True
) -> tuple:
"""Construct mapping table from expression's mapping dictionary and rule's output variables.
Args:
expr: The expression object should be associated
output_vars: The output mapping table of rule scripts
strict: If we want to output the intermediate results the `output_vars` may be not contains all the required variables, so we only process the matched result.
Returns:
(bool, str, dict) -> (succ, error string, mapping table)
"""
error_msg = ""
mapping_table = {}
emapping = {}
for kkey, val in expr["mapping"].items():
succ, error_msg, rv = ExpressionEngineHelper.extract_mapping_var(val)
if succ:
if not rv[0]:
error_msg = (
f"Rule prefix is required in current version: {expr['name']}"
)
print(f"[ERROR] {error_msg}!")
return (not error_msg), error_msg, None
emapping[kkey] = rv
else:
error_msg = f"Failed to extract mappting values: {expr['name']}"
print(f"[ERROR] {error_msg}")
return (not error_msg), error_msg, None
for key, val in emapping.items():
if val[0] not in output_vars:
if strict:
raise Exception(f"{val[0]} is not a valid rule")
else:
continue
if val[1] not in output_vars[val[0]]:
if strict:
raise Exception(
f"{val[1]} does not exist in output variables of {val[0]}"
)
else:
mapping_table[expr["mapping"][key]] = output_vars[val[0]][val[1]]
return (not error_msg), error_msg, mapping_table
class ExpressionEngine:
"""Simple expression parser, replace the variables in expression to specific mapping variable."""
def __init__(self, debug: bool = False):
self.__debug = debug
self.__allow_modules = {"math": math}
self.__allow_names = {}
def execute(
self,
expression: dict,
mapping_table: dict = {},
allow_modules: dict = {},
allow_names: dict = {},
) -> tuple:
"""Replace the expression with Python syntax, parse and run the expression"""
error_msg = ""
if not isinstance(expression, dict):
error_msg = "Type Error: expression object must be a dict object."
if self.__debug:
print(f"[ERROR] {error_msg}")
return (not error_msg), error_msg, None
# Check the format of expression
if "type" not in expression or expression["type"] != "expression":
error_msg = "Type Error: expression object must be a dict object."
if self.__debug:
print(f"[ERROR] {error_msg}")
return (not error_msg), error_msg, None
# Check the format of expression
if "name" not in expression or "value" not in expression:
error_msg = "Key Error: `name` and `value` must exist."
if self.__debug:
print(f"[ERROR] {error_msg}")
return (not error_msg), error_msg, None
allow_modules.update(self.__allow_modules)
allow_names.update(self.__allow_names)
if "mapping" in expression:
var_mapping = expression["mapping"]
for key, val in var_mapping.items():
if val in mapping_table:
allow_names[key] = mapping_table[val]
else:
error_msg = f"Required key <{val}> is missing!"
if self.__debug:
print(f"[WARNING] {error_msg}")
return (not error_msg), error_msg, None
expr = expression["value"]
expr = re.sub(r"\s*&&\s*", " and ", expr)
expr = re.sub(r"\s*\|\|\s*", " or ", expr)
expr = re.sub(r"\s*!\s*", " not ", expr)
if self.__debug:
print(f"[DEBUG] Converted expression: {expression['value']} -> {expr}")
result = None
try:
result = eval(expr, {"__builtins__": allow_modules}, allow_names)
except Exception as e:
if self.__debug:
traceback.print_exc()
_, _, tb = sys.exc_info()
line_number = traceback.extract_tb(tb)[-1][1]
error_msg = "{}(line:{}): {}".format(
e.__class__.__name__, line_number, str(e)
)
return (not error_msg), error_msg, result
class ScriptEngine:
"""A simple script engine based on Python 'exec' mechanism.
This script engine manage the life cycle of scripts via the following three procedures:
- preprocess: Do initialization, create some global variables that can be used in the future processes.
- step: Do the real jobs, deal with the realtime dataframes.
- postprocess: Output some result."""
def __init__(self, debug: bool = False):
self.__debug = debug
self.__output_var = {}
self.__realtime_var = {}
self.__intermediate_var = {}
self.__globals = {}
self.__default_globals = {
"__builtins__": {
"__build_class__": globals()["__builtins__"]["__build_class__"],
"__name__": globals()["__builtins__"]["__name__"],
"dict": dict,
"list": list,
"bool": bool,
"str": str,
"int": int,
"float": float,
"super": super,
"map": map,
"zip": zip,
"abs": abs,
"len": len,
"round": round,
},
"math": math,
"RuleScript": RuleScript,
}
self.__rules = {}
self.__real_rules = []
if self.__debug:
setattr(RuleScript, "LOG_ENABLED", True)
@staticmethod
def preprocess(source: str) -> str:
"""Preprocess the source code string, remove these directives:
- `import`
- `from ... import`"""
# import os
#
# import os, \\
# sys
#
# import os \\
subs = re.sub(
r"^\s*import\s+([\w, \t]*(\s*\\\s*\r*\n*)*)*", "", source, flags=re.M | re.S
)
# from sys import (path, argv)
#
# from sys import (path, \\
# argv)
subs = re.sub(
r"^\s*from\s*\w+\s*import\s*\(([\w, \t]*(\s*\\*\s*\r*\n*))*\)",
"",
subs,
flags=re.M | re.S,
)
# from sys import path
#
# from sys import path, \\
# argv
subs = re.sub(
r"^\s*from\s*\w+\s*import\s+([\w, \t]*(\s*\\*\s*\r*\n*)*)*",
"",
subs,
flags=re.M | re.S,
)
return subs
@staticmethod
def __check_ast(parsed) -> tuple:
error_list = []
for node in ast.walk(parsed):
if isinstance(
node,
(
ast.Import,
ast.ImportFrom, # import or from ... import
ast.AsyncFunctionDef, # async function
ast.Await, # await
ast.Global,
ast.Nonlocal, # global, nonloal
ast.Yield,
ast.YieldFrom, # yield, yield from
ast.Try,
ast.ExceptHandler, # try ... except
ast.Raise,
),
): # raise
rc = {}
if hasattr(node, "end_lineno"):
rc["line"] = node.end_lineno
if hasattr(node, "end_col_offset"):
rc["column"] = node.end_col_offset
error_list.append(rc)
return (not error_list), error_list
def __implemented(self, derived: type, base: type, method: str) -> bool:
return getattr(derived, method) is not getattr(base, method)
def append_modules(self, rule_name: str, modules: dict):
if rule_name not in self.__globals:
self.__globals[rule_name] = {}
self.__globals[rule_name].update(modules)
def compile(self, rules: list) -> tuple:
"""Compile and cache the rules, the rules is a list of dictionary.
Args:
rules: A list of rules that should be run in the future steps.
Each rule in the rule list is a dict object like this:
{"name": "<rule name>", "value": "<rule code string>", ...},
the keys "name" and "value" MUST exist.
Returns:
A tuple contains: (successfully or not, a dict of error messages if any exception occurs)
The format of the error message dict is:
{"<rule name>": "<error message>", "<rule name>": "<error message>", ...}"""
error_msg = {}
succ = True
name_dict = {}
for rule in rules:
# Check rules
if not isinstance(rule, dict):
if self.__debug:
print("[ERROR] A `rule` must be a dict object")
error_msg[str(rule)] = "A `rule` must be a dict object"
succ = False
continue
if "name" not in rule:
if self.__debug:
print("[ERROR] The `rule` dict must contain the key: `name`")
error_msg[str(rule)] = "The `rule` dict must contain the key: `name`"
succ = False
continue
if "value" not in rule:
if self.__debug:
print("[ERROR] The `rule` dict must contain the key: `value`")
error_msg[
rule["name"]
] = "The `rule` dict must contain the key: `value`"
succ = False
continue
rule_name = rule["name"]
if rule_name in name_dict:
if self.__debug:
print(f"[ERROR] The rule name `{rule_name}` has existed!")
error_msg[rule["name"]] = f"The rule name `{rule_name}` has existed!"
succ = False
break
# Collect the rule names
name_dict[rule_name] = True
# Attempt to compile the rule script to Python AST,
# then check forbidden keywords and features
try:
rule_script_str = ScriptEngine.preprocess(rule["value"])
parsed = ast.parse(rule_script_str)
tsucc, error_list = ScriptEngine.__check_ast(parsed)
if not tsucc:
succ = False
if self.__debug:
print(
"Use the forbidden keywords or features in rule script: {}, {}".format(
rule_name, json.dumps(error_list, indent=2)
)
)
error_msg[
rule["name"]
] = "Use the forbidden keywords or features in rule script: {}, {}".format(
rule_name, json.dumps(error_list, indent=2)
)
continue
compiled_rule = compile(rule_script_str, "<string>", "exec")
except Exception as e:
succ = False
if self.__debug:
traceback.print_exc()
_, _, tb = sys.exc_info()
line_number = traceback.extract_tb(tb)[-1][1]
error_msg[rule_name] = "{}(line:{}): {}".format(
e.__class__.__name__, line_number, str(e)
)
continue
rule_item = {"name": rule_name, "value": compiled_rule}
if "mapping" in rule:
try:
rule_item["mapping"] = {}
for k, v in rule["mapping"].items():
rule_item["mapping"][k] = json.loads(v)
except Exception as e:
succ = False
msg = "{}: Failed to load the mapping table, not a valid JSON string: {}".format(
rule_name, str(e)
)
if self.__debug:
print(f"[ERROR] {msg}")
error_msg[rule_name] = msg
continue
self.append_modules(rule_name, self.__default_globals)
if "mapping" in rule_item:
self.append_modules(rule_name, rule_item["mapping"])
self.__rules[rule_name] = rule_item
temp_locals = {}
for rule_name, rs in self.__rules.items():
try:
exec(rs["value"], self.__globals[rule_name], temp_locals)
except Exception as e:
if self.__debug:
traceback.print_exc()
_, _, tb = sys.exc_info()
line_number = traceback.extract_tb(tb)[-1][1]
error_msg[rule_name] = "{}(line:{}): {}".format(
e.__class__.__name__, line_number, str(e)
)
for sbk, sbv in temp_locals.items():
if isinstance(sbv, type) and issubclass(sbv, RuleScript):
for rule_name in self.__rules:
self.append_modules(rule_name, {sbk: sbv})
if (
self.__implemented(sbv, RuleScript, "preprocess")
and self.__implemented(sbv, RuleScript, "step")
and self.__implemented(sbv, RuleScript, "postprocess")
):
self.__real_rules.append([sbk, sbv, None])
if self.__debug:
print(f"[DEBUG] Rule script class {sbk} is loaded.")
elif self.__debug:
print(
f"[DEBUG] {sbk} is not a real rule script, maybe an utility class."
)
return succ, error_msg
def do_preprocess(self) -> tuple:
"""Run the preprocess routine, as the first step in script engine operation.
Returns:
A tuple contains a bool to indicate whether this routine runs successfully,
and a dict contains all the error message if any rule script failed."""
error_msg = {}
for rule in self.__real_rules:
try:
mycls = rule[1]()
mycls.preprocess()
rule[2] = mycls
except Exception as e:
if self.__debug:
traceback.print_exc()
_, _, tb = sys.exc_info()
line_number = traceback.extract_tb(tb)[-1][1]
error_msg[rule[0]] = "{}(line:{}): {}".format(
e.__class__.__name__, line_number, str(e)
)
return (not error_msg), error_msg
def do_step(self, frame: dict) -> tuple:
"""Run one step, using frame as the dataframe input for each rule.
Args:
frame: User-defined dataframe.
Returns:
A tuple contains a bool to indicate whether this routine runs successfully,
and a dict contains all the error message if any rule script failed.
Note:
You must run this after do_preprocess."""
error_msg = {}
self.__realtime_var = {}
self.__intermediate_var = {}
for rule in self.__real_rules:
rule_name = rule[0]
try:
ret = rule[2].step(frame)
if isinstance(ret, dict) and len(ret) > 0:
self.__realtime_var[rule_name] = ret
iv = rule[2].get_intermediate()
if isinstance(iv, dict) and len(iv) > 0:
self.__intermediate_var[rule_name] = iv
except Exception as e:
if self.__debug:
traceback.print_exc()
_, _, tb = sys.exc_info()
line_number = traceback.extract_tb(tb)[-1][1]
error_msg[rule_name] = "{}(line:{}): {}".format(
e.__class__.__name__, line_number, str(e)
)
return (not error_msg), error_msg
def do_postprocess(self) -> tuple:
"""Run the final step, finish the life cycle of the rule scripts, construct the output data.
Returns:
A tuple contains a bool to indicate whether this routine runs successfully,
a dict contains all the error message if any rule script failed.
Note:
After this function, you can retrieve the output variables via get_output_var, but please do this before do_preprocess,
because do_preprocess will reset all the cached states and data."""
error_msg = {}
self.__output_var = {}
for rule in self.__real_rules:
rule_name = rule[0]
try:
ret = rule[2].postprocess()
if isinstance(ret, dict) and len(ret) > 0:
self.__output_var[rule_name] = ret
elif isinstance(ret, list) and len(ret) > 0:
self.__output_var[rule_name] = ret
except Exception as e:
if self.__debug:
traceback.print_exc()
_, _, tb = sys.exc_info()
line_number = traceback.extract_tb(tb)[-1][1]
error_msg[rule_name] = "{}(line:{}): {}".format(
e.__class__.__name__, line_number, str(e)
)
return (not error_msg), error_msg
def get_output_var(self) -> dict:
return self.__output_var
def get_output_var_copy(self) -> dict:
return copy.deepcopy(self.__output_var)
def get_intermediate_var(self) -> dict:
return self.__intermediate_var
def get_intermediate_var_copy(self) -> dict:
return copy.deepcopy(self.__intermediate_var)
def get_realtime_var(self) -> dict:
return self.__realtime_var
def get_realtime_var_copy(self) -> dict:
return copy.deepcopy(self.__realtime_var)
@property
def rules(self) -> dict:
return self.__rules
@staticmethod
def check_script(
input_string: str,
modules: dict = {},
names: dict = {},
frame: dict = {},
debug: bool = False,
) -> tuple:
error_msg = {}
try:
rule_script_str = ScriptEngine.preprocess(input_string)
parsed = ast.parse(rule_script_str)
tsucc, error_list = ScriptEngine.__check_ast(parsed)
if not tsucc:
if debug:
print(
"Use the forbidden keywords or features in rule script: {}".format(
json.dumps(error_list, indent=2)
)
)
error_msg[
"COMPILE"
] = "Use the forbidden keywords or features in rule script: {}".format(
json.dumps(error_list, indent=2)
)
return (not error_msg), error_msg
except Exception as e:
if debug:
traceback.print_exc()
_, _, tb = sys.exc_info()
line_number = traceback.extract_tb(tb)[-1][1]
error_msg["COMPILE"] = "{}(line:{}): {}".format(
e.__class__.__name__, line_number, str(e)
)
return (not error_msg), error_msg
try:
exec(input_string)
except Exception as e:
if debug:
traceback.print_exc()
_, _, tb = sys.exc_info()
line_number = traceback.extract_tb(tb)[-1][1]
error_msg["PREPROCESS"] = "{}(line:{}): {}".format(
e.__class__.__name__, line_number, str(e)
)
return (not error_msg), error_msg

View File

@ -0,0 +1,409 @@
#!/usr/bin/env python3
# -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
#
# NavInfo Software Copyright Notice
# Version 1, March 2022
#
# Copyright (c) 2022- NavInfo Co., Ltd. All Rights Reserved.
#
# The NavInfo software and associated documentation files shall not
# be used, reproduced, distribute, redistribute or otherwise transmitted
# in part or whole, in any form or reason, in the absence of NavInfo's
# express and written consent.
#
# Unless otherwise stated by NavInfothis software is provided "AS IS".
# To the maximum extent permitted by law, NavInfo disclaims any warranty
# including implied warranty and does not assume any liability, in
# particular as to the accuracy, faultlessness, fitness for any purpose,
# freedom of any third parties rights or completeness.
#
# Contact us: NavInfo Legal Department <legal@navinfo.com>
import json
import sys
import os
import re
import time
import traceback
import requests
from script_engine import ScriptEngine
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DEBUG = True
# Helper functions
def get_timestamp(ct: float = 0):
if ct == 0:
ct = time.time()
local_time = time.localtime(ct)
data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
data_secs = int((ct - int(ct)) * 1000)
return f"{data_head}.{data_secs}"
class WaitPolicy:
def __init__(
self, init_val: float = 0.5, max_val: float = 10.0, func=lambda c: c * 2
):
self.__init_val = init_val
self.__max_val = max_val
self.__func = func
self.__curr = init_val
def next_interval(self):
cur = self.__curr
if self.__curr < self.__max_val:
self.__curr = self.__func(self.__curr)
if self.__curr > self.__max_val:
self.__curr = self.__max_val
return cur
def reset(self):
self.__curr = self.__init_val
class Waiter:
def __init__(self, wp: WaitPolicy, retry_times: int = 5, debug: bool = False):
self.__wp = wp
self.__debug = debug
self.__counter = 0
self.__retry_times = retry_times
def wait(self):
interval = self.__wp.next_interval()
if self.__debug:
print(f"[D] Will sleep {interval} seconds")
try:
time.sleep(interval)
except Exception:
if self.__debug:
traceback.print_exc()
self.__counter += 1
def retry_failure(self) -> bool:
if self.__debug:
print(f"[D] Already retry {self.__counter} times.")
return self.__counter >= self.__retry_times
def reset(self):
self.__wp.reset()
self.__counter = 0
class ScriptLoader:
"""Load the script files recursively and return the script list."""
def __init__(self):
self.__timeout = 5
# Waiter for online testing
self.__waiter = Waiter(WaitPolicy(), debug=DEBUG)
self.__retry_waiter = Waiter(WaitPolicy(init_val=1.0), debug=DEBUG)
self.__rules = []
def load(self, script_dir_path: str) -> list:
"""Load the script files recursively and return the script list."""
self.__rules = []
# Process the common rules firstly
script_dir_path = os.path.abspath(script_dir_path)
self.__rules.extend(self.__process_dir(script_dir_path))
# Process the script files in subdirectories
for root, dirs, _ in os.walk(script_dir_path):
for d in dirs:
self.__rules.extend(self.__process_dir(os.path.join(root, d)))
return self.__rules
def __remove_import(self, source: str) -> str:
"""Preprocess the source code string, remove these directives:
- `import`
- `from ... import`"""
# import os
#
# import os, \\
# sys
#
# import os \\
subs = re.sub(
r"^\s*import\s+([\w, \t]*(\s*\\\s*\r*\n*)*)*", "", source, flags=re.M | re.S
)
# from sys import (path, argv)
#
# from sys import (path, \
# argv)
subs = re.sub(
r"^\s*from\s*\w+\s*import\s*\(([\w, \t]*(\s*\\*\s*\r*\n*))*\)",
"",
subs,
flags=re.M | re.S,
)
# from sys import path
#
# from sys import path, \
# argv
subs = re.sub(
r"^\s*from\s*\w+\s*import\s+((?!class)[\w, \t]*(\s*\\*\s*\r*\n*)*)*",
"",
subs,
flags=re.M | re.S,
)
return subs
def one_expr(self, file_path: str, mapping: dict) -> dict:
jv = {
"name": os.path.splitext(os.path.basename(file_path))[0],
"value": open(file_path).read().strip(),
"type": "expression",
"mapping": mapping,
}
return jv
def one_rule(self, file_path: str) -> dict:
jv = {
"name": os.path.splitext(os.path.basename(file_path))[0],
"value": "",
"type": "script",
}
ret = []
fcontent = open(file_path).read()
fcontent = self.__remove_import(fcontent)
for line in fcontent.splitlines():
ls = line.strip()
if not ls:
continue
if ls.startswith("#"):
continue
ret.append(line)
jv["value"] = "\n".join(ret)
return jv
def __process_dir(self, dirpath: str) -> list:
rule_list = []
# Rules
for f in os.listdir(dirpath):
if f.endswith("_rule.py"):
rule = self.one_rule(os.path.join(dirpath, f))
mapping_file = os.path.join(
dirpath, "{}.json".format(os.path.splitext(f)[0])
)
if os.path.isfile(mapping_file):
rule["mapping"] = json.loads(open(mapping_file).read())
rule_list.append(rule)
# Expression
for f in os.listdir(dirpath):
if f.endswith(".expr"):
mapping_file = os.path.join(
dirpath, "{}.json".format(os.path.splitext(f)[0])
)
if os.path.isfile(mapping_file):
mapping = json.loads(open(mapping_file).read())
rule_list.append(self.one_expr(os.path.join(dirpath, f), mapping))
else:
print(f"[W] Mapping table of {f} is missing!")
return rule_list
def __dataframe(self, task_url: str, task_id: str, step_num: int = 100):
# Download the dataframes
start_time = ""
frame_id = 0
ntfid = 0
has_data = False
timeout = 0.0
while True:
start = time.time()
succ, error_msg, data = True, "", {}
if succ:
# If can not get data, wait forever
if not data:
self.__waiter.wait()
if not has_data:
if DEBUG:
print(
f"[{get_timestamp()}] [W] No data from remote URL"
)
else: # Calculate the timeout
# If we find the last frame == current frame, terminal evaluation process immediately
tsucc, ntfid = self.__check_last_frame()
if tsucc and ntfid == frame_id:
if DEBUG:
print(
f"[{get_timestamp()}] [D] Get the last data frame"
)
break
timeout += 0.0
if timeout >= self.__timeout:
if DEBUG:
print(
f"[{get_timestamp()}] [W] Timeout"
)
break
continue
else:
has_data = True
# Reset the timer
timeout = 0
self.__waiter.reset()
self.__retry_waiter.reset()
yield from data
if len(data) == 0:
continue
last_frame = data[-1]
if "frameDataVO" in last_frame and last_frame["frameDataVO"]:
if last_frame["frameDataVO"]["is_end_flag"]:
if DEBUG:
print(
f"[{get_timestamp()}] [DEBUG] Retrieve the last frame"
)
break
start_time = int(last_frame["frameDataVO"]["traffic_timestamp"])
else:
print(
f"[{get_timestamp()}] [W] Invalid traffic frame(miss frameDataVO)"
)
frame_id = int(last_frame["frameId"]) + 1
else:
# If failed to get data from server,
# usually, some connection exceptions occurred,
# just wait a moment and retry later.
print(f"[E] Failed to get dataframe {error_msg}!")
if not self.__retry_waiter.retry_failure():
self.__retry_waiter.wait()
continue
yield from []
break
def __generate_rule_scripts(self) -> list:
rule_scripts = []
for rule in self.__rules:
if rule["type"] == "script":
rule_scripts.append(rule)
mapping = {}
if "mapping" in rule:
for k, v in rule["mapping"].items():
mapping[k] = json.loads(v)
succ, error_msg = ScriptEngine.check_script(
rule["value"], names=mapping
)
if not succ:
print("[E] Check Rule:", rule["name"], error_msg)
return []
return rule_scripts
def do_statistics_online(self, task_url: str, task_id: str, output_file_path: str):
se = ScriptEngine(debug=True)
rule_scripts = self.__generate_rule_scripts()
succ, error_msg = se.compile(rule_scripts)
if not succ:
print("[E] Compiling:", error_msg)
return
succ, error_msg = se.do_preprocess()
if not succ:
print(error_msg)
return
count = 0
for df in self.__dataframe(task_url, task_id):
succ, error_msg = se.do_step(df)
if not succ:
print(f"[E] {error_msg}")
return
count += 1
succ, error_msg = se.do_postprocess()
if not succ:
print(f"[E] {error_msg}")
return
with open(output_file_path, "w+") as f:
f.seek(0, 0)
json.dump(se.get_output_var(), f, indent=2)
f.truncate()
def do_statistics_offline(self, dataframe_file_path: str, output_file_path: str):
if not os.path.isfile(dataframe_file_path):
print("[E] Dataframe file does not exist: {}".format(dataframe_file_path))
return
data = json.loads(open(dataframe_file_path).read())
dfs = data["result"]["data"]
se = ScriptEngine(debug=True)
rule_scripts = self.__generate_rule_scripts()
succ, error_msg = se.compile(rule_scripts)
if not succ:
print("[E] Compiling:", error_msg)
return
succ, error_msg = se.do_preprocess()
if not succ:
print(error_msg)
return
count = 0
for df in dfs:
succ, error_msg = se.do_step(df)
if not succ:
print(f"[E] {error_msg}")
return
count += 1
succ, error_msg = se.do_postprocess()
if not succ:
print(f"[E] {error_msg}")
return
r = se.get_output_var()
with open(output_file_path, "w+") as f:
f.seek(0, 0)
json.dump(r, f, indent=2)
f.truncate()
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Test: python3 script_loader.py <script_dir>")
sys.exit(1)
sl = ScriptLoader()
print(json.dumps(sl.load(sys.argv[1]), indent=2))
if len(sys.argv) < 4:
print(
"If you want to generate offline report, please append two arugments: <input dataframes> <output file>"
)
sys.exit(0)
sl.do_statistics_offline(sys.argv[2], sys.argv[3])

View File

@ -0,0 +1,46 @@
#!/usr/bin/env python3
# -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# Version 2, December 2004
#
# Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
#
# Everyone is permitted to copy and distribute verbatim or modified
# copies of this license document, and changing it is allowed as long
# as the name is changed.
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
#
# 0. You just DO WHAT THE FUCK YOU WANT TO.
#
# Copyright (c) 2022- donkey <anjingyu_ws@foxmail.com>
import ttkbootstrap as ttk
from ttkbootstrap.constants import *
__all__ = ["StatusBar"]
class StatusBar(ttk.Label):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config(text="Status: ", bootstyle="success", relief=RAISED, anchor=W)
def show_info(self, info):
self.config(text=f"Status: {info}", bootstyle="success")
def show_warning(self, warning):
self.config(text=f"Status: {warning}", bootstyle="warning")
def show_status(self, status):
self.config(text=f"Status: {status}", bootstyle="info")
def show_error(self, error):
self.config(text=f"Status: {error}", bootstyle="danger")
def reset(self):
self.config(text="Status: ", foreground="success")

21
rule-editor/run.bat Normal file
View File

@ -0,0 +1,21 @@
@echo off
setlocal EnableDelayedExpansion
REM Start Rule Script Editor
REM
REM Author: donkey <anjingyu_ws@foxmail.com>
set cur_dir=%~dp0
pushd %cur_dir% 1>nul 2>&1
set PYTHONPATH="%cur_dir%"
set py_cmd=python3
where %py_cmd% > nul 2>&1
if %ERRORLEVEL% equ 0 (
python3 %cur_dir%\main.py
) else (
python %cur_dir%\main.py
)
popd 1>nul 2>&1

14
rule-editor/run.sh Executable file
View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
#
# Start Rule Script Editor
#
# Author: donkey <anjingyu_ws@foxmail.com>
readonly CUR_DIR=$(cd `dirname $0`; pwd)
pushd ${CUR_DIR} 1>/dev/null 2>&1
PYTHONPATH="${CUR_DIR}" python3 main.py
popd 1>/dev/null 2>&1