Skip to content

Comments

✨ Avro - Automatic time conversion by schema registry#9

Open
ignitz wants to merge 1 commit intomainfrom
feature/avro/automatic-time-conversion
Open

✨ Avro - Automatic time conversion by schema registry#9
ignitz wants to merge 1 commit intomainfrom
feature/avro/automatic-time-conversion

Conversation

@ignitz
Copy link
Owner

@ignitz ignitz commented Jun 12, 2024

Trying to make automatic convertions from epoch to timestamp and date.

Example

import json

avro_schema_json = """
{
    "type": "record",
    "name": "Envelope",
    "namespace": "dbserver1.inventory.products",
    "fields": [
        {
            "name": "before",
            "type": [
                "null",
                {
                    "type": "record",
                    "name": "Value",
                    "fields": [
                        {
                            "name": "id",
                            "type": {
                                "type": "int",
                                "connect.default": 0
                            },
                            "default": 0
                        },
                        {
                            "name": "name",
                            "type": "string"
                        },
                        {
                            "name": "description",
                            "type": [
                                "null",
                                "string"
                            ],
                            "default": null
                        },
                        {
                            "name": "weight",
                            "type": [
                                "null",
                                "double"
                            ],
                            "default": null
                        },
                        {
                            "name": "date",
                            "type": [
                                "null",
                                {
                                    "type": "int",
                                    "connect.version": 1,
                                    "connect.name": "io.debezium.time.Date"
                                }
                            ],
                            "default": null
                        },
                        {
                            "name": "update_at",
                            "type": [
                                "null",
                                {
                                    "type": "long",
                                    "connect.version": 1,
                                    "connect.name": "io.debezium.time.MicroTimestamp"
                                }
                            ],
                            "default": null
                        },
                        {
                            "name": "timestamp_tz",
                            "type": [
                                "null",
                                {
                                    "type": "string",
                                    "connect.version": 1,
                                    "connect.name": "io.debezium.time.ZonedTimestamp"
                                }
                            ],
                            "default": null
                        },
                        {
                            "name": "timestamp_wtz",
                            "type": [
                                "null",
                                {
                                    "type": "long",
                                    "connect.version": 1,
                                    "connect.name": "io.debezium.time.MicroTimestamp"
                                }
                            ],
                            "default": null
                        },
                        {
                            "name": "timestamptz",
                            "type": [
                                "null",
                                {
                                    "type": "string",
                                    "connect.version": 1,
                                    "connect.name": "io.debezium.time.ZonedTimestamp"
                                }
                            ],
                            "default": null
                        }
                    ],
                    "connect.name": "dbserver1.inventory.products.Value"
                }
            ],
            "default": null
        },
        {
            "name": "after",
            "type": [
                "null",
                "Value"
            ],
            "default": null
        },
        {
            "name": "source",
            "type": {
                "type": "record",
                "name": "Source",
                "namespace": "io.debezium.connector.postgresql",
                "fields": [
                    {
                        "name": "version",
                        "type": "string"
                    },
                    {
                        "name": "connector",
                        "type": "string"
                    },
                    {
                        "name": "name",
                        "type": "string"
                    },
                    {
                        "name": "ts_ms",
                        "type": "long"
                    },
                    {
                        "name": "snapshot",
                        "type": [
                            {
                                "type": "string",
                                "connect.version": 1,
                                "connect.parameters": {
                                    "allowed": "true,last,false,incremental"
                                },
                                "connect.default": "false",
                                "connect.name": "io.debezium.data.Enum"
                            },
                            "null"
                        ],
                        "default": "false"
                    },
                    {
                        "name": "db",
                        "type": "string"
                    },
                    {
                        "name": "sequence",
                        "type": [
                            "null",
                            "string"
                        ],
                        "default": null
                    },
                    {
                        "name": "schema",
                        "type": "string"
                    },
                    {
                        "name": "table",
                        "type": "string"
                    },
                    {
                        "name": "txId",
                        "type": [
                            "null",
                            "long"
                        ],
                        "default": null
                    },
                    {
                        "name": "lsn",
                        "type": [
                            "null",
                            "long"
                        ],
                        "default": null
                    },
                    {
                        "name": "xmin",
                        "type": [
                            "null",
                            "long"
                        ],
                        "default": null
                    }
                ],
                "connect.name": "io.debezium.connector.postgresql.Source"
            }
        },
        {
            "name": "op",
            "type": "string"
        },
        {
            "name": "ts_ms",
            "type": [
                "null",
                "long"
            ],
            "default": null
        },
        {
            "name": "transaction",
            "type": [
                "null",
                {
                    "type": "record",
                    "name": "ConnectDefault",
                    "namespace": "io.confluent.connect.avro",
                    "fields": [
                        {
                            "name": "id",
                            "type": "string"
                        },
                        {
                            "name": "total_order",
                            "type": "long"
                        },
                        {
                            "name": "data_collection_order",
                            "type": "long"
                        }
                    ]
                }
            ],
            "default": null
        }
    ],
    "connect.name": "dbserver1.inventory.products.Envelope"
}
"""
avro_schema_dict = json.loads(avro_schema_json)
import json

def extract_special_types(input_dict, target_type):
    """
    Extracts paths from a nested JSON dictionary where the value is a dictionary containing the specified target_type.

    Args:
        input_dict: The JSON dictionary (as a Python dictionary) to search.
        target_type: The string representing the target type to search for.

    Returns:
        A list of paths in dot notation where the specified type is found.
    """

    def traverse_dict(d,path=""):
        if isinstance(d, list):
            results = []
            for item in d:
                r = traverse_dict(item, path)
                if r is not None:
                    if isinstance(r, list):
                        results.extend(r)
                    else:
                        results.append(r)
            return results if len(results) > 0 else None
        elif isinstance(d, dict):
            connector_name = d.get("connect.name")
            if connector_name is not None and connector_name == target_type:
                return path
            name = d.get("name")
            if name is not None and name not in ["Envelope", "Value"]:
                path = path + "." + name
            type_field = d.get("type")
            if type_field is not None:
                if type_field == "record":
                    return traverse_dict(d["fields"], path)
                else:
                    return traverse_dict(type_field, path)

    return traverse_dict(input_dict, "value")

# Test cases
print(extract_special_types(avro_schema_dict, "io.debezium.time.Date"))
print(extract_special_types(avro_schema_dict, "io.debezium.time.MicroTimestamp"))
print(extract_special_types(avro_schema_dict, "io.debezium.time.ZonedTimestamp"))
print(extract_special_types(avro_schema_dict, "FOOBAR"))

@gitguardian
Copy link

gitguardian bot commented Jun 12, 2024

⚠️ GitGuardian has uncovered 2 secrets following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

🔎 Detected hardcoded secrets in your pull request
GitGuardian id GitGuardian status Secret Commit Filename
11697828 Triggered Generic Password b82e22d lake_lab/docker-compose.lake.yaml View secret
11697828 Triggered Generic Password b82e22d lake_lab/docker-compose.lake.yaml View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secrets safely. Learn here the best practices.
  3. Revoke and rotate these secrets.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant