-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
93 lines (69 loc) · 2.52 KB
/
cli.py
File metadata and controls
93 lines (69 loc) · 2.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""Developer CLI for manual triggers, keyword management, and DB resets."""
from __future__ import annotations
import json
import os
import sys
import click
from dotenv import load_dotenv
load_dotenv()
from monitor import db, pipeline, search
@click.group()
def cli():
"""Google News API monitor — manual control."""
@cli.command()
def init():
"""Create data.db and tables if missing."""
db.init_db()
click.echo(f"Initialized DB at {db.DB_PATH}")
@cli.command()
@click.confirmation_option(prompt="Delete data.db and all monitored data?")
def reset():
"""Wipe the local database. Cannot be undone."""
if os.path.exists(db.DB_PATH):
os.remove(db.DB_PATH)
db.init_db()
click.echo("Database reset.")
@cli.command(name="add")
@click.argument("keyword")
@click.option("--no-monitor", is_flag=True, help="Add but don't track daily.")
def add_kw(keyword, no_monitor):
"""Add a keyword to track."""
db.add_keyword(keyword, monitored=not no_monitor)
click.echo(f"Added keyword: {keyword} (monitored={not no_monitor})")
@cli.command(name="list")
def list_kws():
"""List tracked keywords."""
for k in db.list_keywords():
flag = "✓" if k["monitored"] else " "
click.echo(f" [{flag}] {k['keyword']}")
@cli.command()
@click.argument("keyword")
def remove(keyword):
"""Stop tracking a keyword."""
db.remove_keyword(keyword)
click.echo(f"Removed: {keyword}")
@cli.command()
@click.argument("keyword")
@click.option("--num", default=50, help="Max results to fetch from Google News.")
@click.option("--when", default="1d", help="Time range — 1h, 1d, 7d, 1m, 1y, or '' for any time.")
@click.option("--gl", default=None, help="2-letter country code (us, gb, de...).")
def search_cmd(keyword, num, when, gl):
"""Run the full pipeline once for KEYWORD."""
click.echo(f"Query: {search.build_query(keyword)}")
result = pipeline.process_keyword(
keyword, num=num, when=(when or None), gl=gl,
)
click.echo(json.dumps(result, indent=2))
@cli.command()
def cron():
"""Trigger the daily monitoring job for all monitored keywords now."""
results = pipeline.run_all_monitored()
click.echo(json.dumps(results, indent=2))
@cli.command()
@click.argument("keyword")
@click.option("--period", type=click.Choice(["daily", "weekly", "monthly"]), default="weekly")
def report(keyword, period):
"""Print a structured period report."""
click.echo(json.dumps(pipeline.period_report(keyword, period=period), indent=2))
if __name__ == "__main__":
cli()