Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions basic-custom-agent/cookiecutter.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"project_name": "Basic custom agent"
}
33 changes: 33 additions & 0 deletions basic-custom-agent/{{cookiecutter.project_name}}/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
FROM python:3.10-slim-bookworm AS agent-slim

ARG VERSION

# Install required dependencies
RUN apt-get update && apt-get install -y \
build-essential \
git \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

# Install Python dependencies
RUN pip install --no-cache-dir \
prometheus-client \
grpcio-health-checking==1.67.1

# Install Flytekit from GitHub
RUN pip install --no-cache-dir git+https://github.com/flyteorg/flytekit.git@master

# Copy and install the bigquery plugin
COPY flytekit-bigquery /flytekit-bigquery
RUN pip install --no-cache-dir /flytekit-bigquery

COPY flytekit-openai /flytekit-openai
RUN pip install --no-cache-dir /flytekit-openai

# Cleanup
RUN apt-get purge -y build-essential git \
&& apt-get autoremove -y \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*

# Set the default command
CMD ["pyflyte", "serve", "agent", "--port", "8000"]
40 changes: 40 additions & 0 deletions basic-custom-agent/{{cookiecutter.project_name}}/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# flyte-custom-agent-template
How to write your custom agent and build it with a Dockerfile.

## Concepts
1. flytekit will load plugin [here](https://github.com/flyteorg/flytekit/blob/ff2d0da686c82266db4dbf764a009896cf062349/flytekit/__init__.py#L322-L323),
so you must add your plugin to `entry_points` in [setup.py](https://github.com/Future-Outlier/flyte-custom-agent-template/blob/main/flytekit-bigquery/setup.py#L39).
2. Agent registration is triggered by loading the plugin. For example,
BigQuery's agent registration is triggered [here](https://github.com/Future-Outlier/flyte-custom-agent/blob/main/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L97)

## Build your custom agent
1. Following the folder structure in this repo, you can build your custom agent.
2. Build your own custom agent ([learn more](https://docs.flyte.org/en/latest/user_guide/flyte_agents/developing_agents.html))

> In the following command, `localhost:30000` is the Docker registry that ships with the Flyte demo cluster. Use it or replace it with a registry where you have push permissions.

```bash
docker buildx build --platform linux/amd64 -t localhost:30000/flyteagent:custom-agent -f Dockerfile .
```

3. Test the image:
```bash
docker run -it localhost:30000/flyteagent:custom-agent
```

4. Check the logs (sensor is created by flytekit, bigquery and openai is created by the custom agent)
```
(dev) future@outlier ~ % docker run -it localhost:30000/flyteagent:custom-agent

WARNING: The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested
🚀 Starting the agent service...
Starting up the server to expose the prometheus metrics...
Agent Metadata
┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┓
┃ Agent Name ┃ Support Task Types ┃ Is Sync ┃
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━┩
│ Sensor │ sensor (v0) │ False │
│ ChatGPT Agent │ chatgpt (v0) │ True │
│ Bigquery Agent │ bigquery_query_job_task (v0) │ False │
└────────────────┴───────────────────────────────┴─────────┘
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Flytekit BigQuery Plugin

BigQuery enables us to build data-intensive applications without operational burden. Flyte backend can be connected with the BigQuery service. Once enabled, it can allow you to query a BigQuery table.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-bigquery
```

To configure BigQuery in the Flyte deployment's backend, follow the [configuration guide](https://docs.flyte.org/en/latest/deployment/plugin_setup/gcp/bigquery.html#deployment-plugin-setup-gcp-bigquery).
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
.. currentmodule:: flytekitplugins.bigquery

This package contains things that are useful when extending Flytekit.

.. autosummary::
:template: custom.rst
:toctree: generated/

BigQueryConfig
BigQueryTask
BigQueryAgent
"""

from .agent import BigQueryAgent
from .task import BigQueryConfig, BigQueryTask
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import datetime
from dataclasses import dataclass
from typing import Dict, Optional

from flyteidl.core.execution_pb2 import TaskExecution, TaskLog
from google.cloud import bigquery

from flytekit import FlyteContextManager, StructuredDataset, logger
from flytekit.core.type_engine import TypeEngine
from flytekit.extend.backend.base_agent import AgentRegistry, AsyncAgentBase, Resource, ResourceMeta
from flytekit.extend.backend.utils import convert_to_flyte_phase
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate

pythonTypeToBigQueryType: Dict[type, str] = {
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#data_type_sizes
list: "ARRAY",
bool: "BOOL",
bytes: "BYTES",
datetime.datetime: "DATETIME",
float: "FLOAT64",
int: "INT64",
str: "STRING",
}


@dataclass
class BigQueryMetadata(ResourceMeta):
job_id: str
project: str
location: str


class BigQueryAgent(AsyncAgentBase):
name = "Bigquery Agent"

def __init__(self):
super().__init__(task_type_name="bigquery_query_job_task", metadata_type=BigQueryMetadata)

def create(
self,
task_template: TaskTemplate,
inputs: Optional[LiteralMap] = None,
**kwargs,
) -> BigQueryMetadata:
job_config = None
if inputs:
ctx = FlyteContextManager.current_context()
python_interface_inputs = {
name: TypeEngine.guess_python_type(lt.type) for name, lt in task_template.interface.inputs.items()
}
native_inputs = TypeEngine.literal_map_to_kwargs(ctx, inputs, python_interface_inputs)
logger.info(f"Create BigQuery job config with inputs: {native_inputs}")
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter(name, pythonTypeToBigQueryType[python_interface_inputs[name]], val)
for name, val in native_inputs.items()
]
)

custom = task_template.custom
project = custom["ProjectID"]
location = custom["Location"]
client = bigquery.Client(project=project, location=location)
query_job = client.query(task_template.sql.statement, job_config=job_config)

return BigQueryMetadata(job_id=str(query_job.job_id), location=location, project=project)

def get(self, resource_meta: BigQueryMetadata, **kwargs) -> Resource:
client = bigquery.Client()
log_link = TaskLog(
uri=f"https://console.cloud.google.com/bigquery?project={resource_meta.project}&j=bq:{resource_meta.location}:{resource_meta.job_id}&page=queryresults",
name="BigQuery Console",
)

job = client.get_job(resource_meta.job_id, resource_meta.project, resource_meta.location)
if job.errors:
logger.error("failed to run BigQuery job with error:", job.errors.__str__())
return Resource(phase=TaskExecution.FAILED, message=job.errors.__str__(), log_links=[log_link])

cur_phase = convert_to_flyte_phase(str(job.state))
res = None

if cur_phase == TaskExecution.SUCCEEDED:
dst = job.destination
if dst:
output_location = f"bq://{dst.project}:{dst.dataset_id}.{dst.table_id}"
res = {"results": StructuredDataset(uri=output_location)}

return Resource(phase=cur_phase, message=str(job.state), log_links=[log_link], outputs=res)

def delete(self, resource_meta: BigQueryMetadata, **kwargs):
client = bigquery.Client()
client.cancel_job(resource_meta.job_id, resource_meta.project, resource_meta.location)


AgentRegistry.register(BigQueryAgent())
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional, Type

from google.protobuf import json_format
from google.protobuf.struct_pb2 import Struct

from flytekit import lazy_module
from flytekit.configuration import SerializationSettings
from flytekit.extend import SQLTask
from flytekit.extend.backend.base_agent import AsyncAgentExecutorMixin
from flytekit.models import task as _task_model
from flytekit.types.structured import StructuredDataset

bigquery = lazy_module("google.cloud.bigquery")


@dataclass
class BigQueryConfig(object):
"""
BigQueryConfig should be used to configure a BigQuery Task.
"""

ProjectID: str
Location: Optional[str] = None
QueryJobConfig: Optional[bigquery.QueryJobConfig] = None


class BigQueryTask(AsyncAgentExecutorMixin, SQLTask[BigQueryConfig]):
"""
This is the simplest form of a BigQuery Task, that can be used even for tasks that do not produce any output.
"""

# This task is executed using the BigQuery handler in the backend.
# https://github.com/flyteorg/flyteplugins/blob/43623826fb189fa64dc4cb53e7025b517d911f22/go/tasks/plugins/webapi/bigquery/plugin.go#L34
_TASK_TYPE = "bigquery_query_job_task"

def __init__(
self,
name: str,
query_template: str,
task_config: BigQueryConfig,
inputs: Optional[Dict[str, Type]] = None,
output_structured_dataset_type: Optional[Type[StructuredDataset]] = None,
**kwargs,
):
"""
To be used to query BigQuery Tables.

:param name: Name of this task, should be unique in the project
:param query_template: The actual query to run. We use Flyte's Golang templating format for Query templating. Refer to the templating documentation
:param task_config: BigQueryConfig object
:param inputs: Name and type of inputs specified as an ordered dictionary
:param output_structured_dataset_type: If some data is produced by this query, then you can specify the output StructuredDataset type
:param kwargs: All other args required by Parent type - SQLTask
"""
outputs = None
if output_structured_dataset_type is not None:
outputs = {
"results": output_structured_dataset_type,
}
super().__init__(
name=name,
task_config=task_config,
query_template=query_template,
inputs=inputs,
outputs=outputs,
task_type=self._TASK_TYPE,
**kwargs,
)
self._output_structured_dataset_type = output_structured_dataset_type

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
config = {
"Location": self.task_config.Location,
"ProjectID": self.task_config.ProjectID,
}
if self.task_config.QueryJobConfig is not None:
config.update(self.task_config.QueryJobConfig.to_api_repr()["query"])
s = Struct()
s.update(config)
return json_format.MessageToDict(s)

def get_sql(self, settings: SerializationSettings) -> Optional[_task_model.Sql]:
sql = _task_model.Sql(statement=self.query_template, dialect=_task_model.Sql.Dialect.ANSI)
return sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from setuptools import setup

PLUGIN_NAME = "bigquery"

microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = [
"flytekit>1.10.7",
"google-cloud-bigquery>=3.21.0",
"google-cloud-bigquery-storage>=2.25.0",
"flyteidl>1.10.7",
]

__version__ = "0.0.0+develop"

setup(
name=microlib_name,
version=__version__,
author="flyteorg",
author_email="admin@flyte.org",
description="This package holds the Bigquery plugins for flytekit",
namespace_packages=["flytekitplugins"],
packages=[f"flytekitplugins.{PLUGIN_NAME}"],
install_requires=plugin_requires,
license="apache2",
python_requires=">=3.9",
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
entry_points={"flytekit.plugins": [f"{PLUGIN_NAME}=flytekitplugins.{PLUGIN_NAME}"]},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# OpenAI Plugins

The plugin currently features ChatGPT and Batch API agents.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-openai
```

## ChatGPT

The ChatGPT plugin allows you to run ChatGPT tasks within the Flyte workflow without requiring any code changes.

```python
from flytekit import task, workflow
from flytekitplugins.openai import ChatGPTTask, ChatGPTConfig

chatgpt_small_job = ChatGPTTask(
name="chatgpt gpt-3.5-turbo",
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
chatgpt_config={
"model": "gpt-3.5-turbo",
"temperature": 0.7,
},
)

chatgpt_big_job = ChatGPTTask(
name="chatgpt gpt-4",
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
chatgpt_config={
"model": "gpt-4",
"temperature": 0.7,
},
)


@workflow
def wf(message: str) -> str:
message = chatgpt_small_job(message=message)
message = chatgpt_big_job(message=message)
return message


if __name__ == "__main__":
print(wf(message="hi"))
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
.. currentmodule:: flytekitplugins.openai

.. autosummary::
:template: custom.rst
:toctree: generated/

DownloadJSONFilesTask
UploadJSONLFileTask
OpenAIFileConfig
ChatGPTAgent
ChatGPTTask
"""

from .chatgpt.agent import ChatGPTAgent
from .chatgpt.task import ChatGPTTask
Loading
Loading