from pathlib import Path import sys import time from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from watchdog.events import LoggingEventHandler import asyncio.subprocess as subprocess import asyncio from watchfiles import awatch, watch from termcolor import colored from datetime import datetime import orjson import os async def assert_sql(sql, target): p = await subprocess.create_subprocess_exec( "xmake", "run", "sql-parser", sql, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(p.communicate(), timeout=5) if b"error" in stdout: print(stdout.decode("utf-8")) print(datetime.now(), "-" * 40) print(f'other: {colored(stderr.decode("utf-8"), "yellow")}') assert False, "sql-parser error" try: output = orjson.loads(stdout) except Exception as e: output = {"error": e, "output": stdout.decode("utf-8")} open("/tmp/temp/test.py", "wb").write( f'"{sql}"\n\n'.encode("utf-8") + orjson.dumps(output, option=orjson.OPT_INDENT_2) + (b"\n\n" + stderr).replace(b"\n", b"\n# ") ) assert ( output == target ), f"""{colored("sql-parser error", "red")} input: {colored(sql, "yellow")} expect: {colored(target, "green")} actual: {colored(output, "red")} other: {colored(stderr.decode("utf-8"), "yellow")} """ async def assert_sqls(): await assert_sql( "create table asd;", [{"type": "create_table", "table_name": "asd", "cols": []}] ) await assert_sql( "create table tb (col1 INT, col2 string, col3 FLOAT);", [ { "type": "create_table", "table_name": "tb", "cols": [ { "type": "create_column", "column_name": "col1", "data_type": "INT", "primary_key": False, }, { "type": "create_column", "column_name": "col2", "data_type": "string", "primary_key": False, }, { "type": "create_column", "column_name": "col3", "data_type": "FLOAT", "primary_key": False, }, ], } ], ) await assert_sql( """ create table tb1 ( col1 int primary key, col2 FLOAT ); """, [ { "type": "create_table", "table_name": "tb1", "cols": [ { "type": "create_column", "column_name": "col1", "data_type": "int", "primary_key": True, }, { "type": "create_column", "column_name": "col2", "data_type": "FLOAT", "primary_key": False, }, ], } ], ) await assert_sql( """ create table tb2 ( x float, y int, z int ); """, [ { "type": "create_table", "table_name": "tb2", "cols": [ { "type": "create_column", "column_name": "x", "data_type": "float", "primary_key": False, }, { "type": "create_column", "column_name": "y", "data_type": "int", "primary_key": False, }, { "type": "create_column", "column_name": "z", "data_type": "int", "primary_key": False, }, ], } ], ) await assert_sql( """insert into tb1 values (1, 'foo');""", [ { "type": "insert", "table_name": "tb1", "values": [ {"type": "int", "value": 1}, {"type": "string", "value": "'foo'"}, ], } ], ) await assert_sql( """insert into tb1 values (2, 'foo', 'zxc', 1234.234);""", [ { "type": "insert", "table_name": "tb1", "values": [ {"type": "int", "value": 2}, {"type": "string", "value": "'foo'"}, {"type": "string", "value": "'zxc'"}, {"type": "float", "value": 1234.234}, ], } ], ) await assert_sql( "update tb1 set col1=3, col4=4 where col1=2 and col2=4;", [ { "type": "update", "table_name": "tb1", "set": [ { "type": "assign_const", "left": {"type": "identifier", "value": "col1"}, "right": {"type": "int", "value": 3}, }, { "type": "assign_const", "left": {"type": "identifier", "value": "col4"}, "right": {"type": "int", "value": 4}, }, ], "where": { "type": "且", "left": { "type": "相等", "left": {"type": "identifier", "value": "col1"}, "right": {"type": "int", "value": 2}, }, "right": { "type": "相等", "left": {"type": "identifier", "value": "col2"}, "right": {"type": "int", "value": 4}, }, }, } ], ) await assert_sql( "update tb1 set col1=3, col4=4 where not not not col1=2 and col2=4 or col3=col2;", [ { "type": "update", "table_name": "tb1", "set": [ { "type": "assign_const", "left": {"type": "identifier", "value": "col1"}, "right": {"type": "int", "value": 3}, }, { "type": "assign_const", "left": {"type": "identifier", "value": "col4"}, "right": {"type": "int", "value": 4}, }, ], "where": { "type": "或", "left": { "type": "且", "left": { "type": "非", "left": { "type": "非", "left": { "type": "非", "left": { "type": "相等", "left": {"type": "identifier", "value": "col1"}, "right": {"type": "int", "value": 2}, }, }, }, }, "right": { "type": "相等", "left": {"type": "identifier", "value": "col2"}, "right": {"type": "int", "value": 4}, }, }, "right": { "type": "相等", "left": {"type": "identifier", "value": "col3"}, "right": {"type": "identifier", "value": "col2"}, }, }, } ], ) await assert_sql( "delete from tb1 where c1 = 1 and c2= 3 or c3=3;", [ { "type": "delete", "table_name": "tb1", "where": { "type": "或", "left": { "left": { "type": "相等", "left": {"type": "identifier", "value": "c1"}, "right": {"type": "int", "value": 1}, }, "type": "且", "right": { "type": "相等", "left": {"type": "identifier", "value": "c2"}, "right": {"type": "int", "value": 3}, }, }, "right": { "type": "相等", "left": {"type": "identifier", "value": "c3"}, "right": {"type": "int", "value": 3}, }, }, } ], ) await assert_sql( "delete from tb1 where c1 = 1 and (c2= 3 or c3=3) or (c4='asd');", [ { "type": "delete", "table_name": "tb1", "where": { "type": "或", "left": { "type": "且", "left": { "type": "相等", "left": {"type": "identifier", "value": "c1"}, "right": {"type": "int", "value": 1}, }, "right": { "type": "或", "left": { "type": "相等", "left": {"type": "identifier", "value": "c2"}, "right": {"type": "int", "value": 3}, }, "right": { "type": "相等", "left": {"type": "identifier", "value": "c3"}, "right": {"type": "int", "value": 3}, }, }, }, "right": { "type": "相等", "left": {"type": "identifier", "value": "c4"}, "right": {"type": "string", "value": "'asd'"}, }, }, } ], ) await assert_sql( "select * from t2;", [ { "type": "select", "select_cols": [{"type": "select_all_column"}], "table_name": "t2", "where": {}, } ], ) await assert_sql( "select c2 as t from t2 where col1>2;", [ { "type": "select", "select_cols": [ { "type": "select_column", "target": {"type": "identifier", "value": "c2"}, "alias": "t", } ], "table_name": "t2", "where": { "type": "大于", "left": {"type": "identifier", "value": "col1"}, "right": {"type": "int", "value": 2}, }, } ], ) await assert_sql( """SELECT Sname FROM Student WHERE Sno IN (1,2) and c in (3, 4, 5);""", [ { "type": "select", "select_cols": [ { "type": "select_column", "target": {"type": "identifier", "value": "Sname"}, } ], "table_name": "Student", "where": { "type": "且", "left": { "type": "包含于", "left": {"type": "identifier", "value": "Sno"}, "right": [ {"type": "int", "value": 1}, {"type": "int", "value": 2}, ], }, "right": { "type": "包含于", "left": {"type": "identifier", "value": "c"}, "right": [ {"type": "int", "value": 3}, {"type": "int", "value": 4}, {"type": "int", "value": 5}, ], }, }, } ], ) await assert_sql( """SELECT Sname FROM Student WHERE Sno IN ( SELECT Sno FROM SC WHERE Cno='81003' );""", [ { "type": "select", "select_cols": [ { "type": "select_column", "target": {"type": "identifier", "value": "Sname"}, } ], "table_name": "Student", "where": { "type": "包含于", "left": {"type": "identifier", "value": "Sno"}, "right": { "type": "select", "select_cols": [ { "type": "select_column", "target": {"type": "identifier", "value": "Sno"}, } ], "table_name": "SC", "where": { "type": "相等", "left": {"type": "identifier", "value": "Cno"}, "right": {"type": "string", "value": "'81003'"}, }, }, }, } ], ) async def on_modified(event): p = await subprocess.create_subprocess_shell( "xmake", stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, _ = await p.communicate() if b"error" in stdout: print(stdout.decode("utf-8")) print(datetime.now(), "-" * 40) return try: await assert_sqls() except Exception as e: print(e) else: print(datetime.now(), colored("all tests right!", "green")) async def restart(): async for _ in awatch(__file__): print("restart") os.execl("/bin/python", Path(__file__).as_posix(), Path(__file__).as_posix()) async def watch_src(): async for changes in awatch("src"): print(datetime.now(), "re run...") await asyncio.wait_for(on_modified(changes), 10) async def main(): try: await assert_sqls() except Exception as e: print(e) await asyncio.gather(restart(), watch_src()) if __name__ == "__main__": asyncio.run(main())