Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ jobs:

- name: Check formatting
run: |
black clickhouse/ --check
black dataloader/ --check

- name: Run linter
run: |
pylint --fail-under=9.5 clickhouse/
pylint --fail-under=9.5 dataloader/

- name: Type check
run: |
mypy clickhouse/
mypy dataloader/
98 changes: 95 additions & 3 deletions dataloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class DataLoader:
client: ClassVar[Client] = Manager.get_connection()

@classmethod
def get_data(
def query(
cls,
source: str,
columns_list: Optional[List[str]] = None,
Expand Down Expand Up @@ -165,7 +165,7 @@ def _resolve_columns(
return ", ".join(selected_cols)

@classmethod
def show_tables(cls) -> List[str]:
def tables(cls) -> List[str]:
"""
Returns available tables.
"""
Expand All @@ -174,10 +174,102 @@ def show_tables(cls) -> List[str]:
return df["name"].tolist()

@classmethod
def show_table_column(cls, source: str) -> List[str]:
def fields(cls, source: str) -> List[str]:
"""
Returns all columns for a given table.
"""
query = f"SHOW COLUMNS FROM {cls.database}.{source}"
df = cls.client.query(query)
return df["field"].tolist()

@classmethod
def all(cls, source: str) -> pd.DataFrame:
"""Fetch entire table."""
raise NotImplementedError

@classmethod
def columns(cls, source: str) -> pd.DataFrame:
"""Select specific columns from source."""
raise NotImplementedError

@classmethod
def head(cls, source: str, n: int = 10) -> pd.DataFrame:
"""Get first N rows."""
raise NotImplementedError

@classmethod
def paginate(cls, source: str, limit: int, offset: int) -> pd.DataFrame:
"""Get paginated results."""
raise NotImplementedError

@classmethod
def filter(cls, source: str, **kwargs) -> pd.DataFrame:
"""
Simple equality filters. Pass column=value pairs.

Example:
DataLoader.filter('equities', symbol='AAPL', date_start='2024-01-01')
"""
raise NotImplementedError

@classmethod
def match_pattern(cls, source: str, pattern: str) -> List[str]:
"""Get columns matching a pattern."""
raise NotImplementedError

@classmethod
def select_pattern(cls, source: str, pattern: str, **filters) -> pd.DataFrame:
"""Select columns matching a pattern with optional filters."""
raise NotImplementedError

@classmethod
def date_range(
cls, source: str, start_date: str, end_date: str, **additional_filters
) -> pd.DataFrame:
"""Get data between two dates (YYYY-MM-DD format)."""
raise NotImplementedError

@classmethod
def first_date(cls, source: str) -> pd.Timestamp:
"""Return the earliest date in the table."""
raise NotImplementedError

@classmethod
def last_date(cls, source: str) -> pd.Timestamp:
"""Return the latest date in the table."""
raise NotImplementedError

@classmethod
def latest(cls, source: str, n: int = 1) -> pd.DataFrame:
"""Return the last N rows per symbol or table."""
raise NotImplementedError

@classmethod
def describe(cls, source: str) -> pd.DataFrame:
"""Return column types, non-null counts, basic stats."""
raise NotImplementedError

@classmethod
def column_types(cls, source: str) -> Dict[str, str]:
"""Return data types for each column in a table."""
raise NotImplementedError

@classmethod
def stream(cls, source: str, batch_size: int = 10000):
"""
Yield data in chunks of batch_size.
Example usage:
for df_chunk in DataLoader.stream('equities', 5000):
process(df_chunk)
"""
raise NotImplementedError

@classmethod
def iter_chunks(cls, source: str, chunk_size: int = 10000):
"""Alias for stream."""
return cls.stream(source, batch_size=chunk_size)

@classmethod
def batch_query(cls, sources: List[str], filters: Optional[Dict[str, Any]] = None):
"""Query multiple tables or symbols in a single call."""
raise NotImplementedError