diff --git a/snuba/datasets/cdc/groupassignee_processor.py b/snuba/datasets/cdc/groupassignee_processor.py deleted file mode 100644 index ee5d8410680..00000000000 --- a/snuba/datasets/cdc/groupassignee_processor.py +++ /dev/null @@ -1,110 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from datetime import datetime -from typing import Any, Mapping, Optional, Sequence, Union - -from snuba.datasets.cdc.cdcprocessors import ( - CdcMessageRow, - CdcProcessor, - parse_postgres_datetime, - postgres_date_to_clickhouse, -) -from snuba.writer import WriterTableRow - - -@dataclass(frozen=True) -class GroupAssigneeRecord: - date_added: Union[datetime, str] - user_id: Optional[int] - team_id: Optional[int] - - -@dataclass(frozen=True) -class GroupAssigneeRow(CdcMessageRow): - offset: Optional[int] - record_deleted: bool - project_id: int - group_id: int - record_content: Union[None, GroupAssigneeRecord] - - @classmethod - def from_wal( - cls, - offset: int, - columnnames: Sequence[str], - columnvalues: Sequence[Any], - ) -> GroupAssigneeRow: - raw_data = dict(zip(columnnames, columnvalues)) - return cls( - offset=offset, - record_deleted=False, - project_id=raw_data["project_id"], - group_id=raw_data["group_id"], - record_content=GroupAssigneeRecord( - date_added=parse_postgres_datetime(raw_data["date_added"]), - user_id=int(raw_data["user_id"]) - if raw_data["user_id"] is not None - else None, - team_id=int(raw_data["team_id"]) - if raw_data["team_id"] is not None - else None, - ), - ) - - @classmethod - def from_bulk( - cls, - row: Mapping[str, Any], - ) -> GroupAssigneeRow: - return cls( - offset=0, - record_deleted=False, - project_id=row["project_id"], - group_id=row["group_id"], - record_content=GroupAssigneeRecord( - date_added=postgres_date_to_clickhouse(row["date_added"]), - user_id=int(row["user_id"]) if row["user_id"] != "" else None, - team_id=int(row["team_id"]) if row["team_id"] != "" else None, - ), - ) - - def to_clickhouse(self) -> WriterTableRow: - record = self.record_content - return { - "offset": self.offset if self.offset is not None else 0, - "project_id": self.project_id, - "group_id": self.group_id, - "record_deleted": 1 if self.record_deleted else 0, - "date_added": None if not record else record.date_added, - "user_id": None if not record else record.user_id, - "team_id": None if not record else record.team_id, - } - - -class GroupAssigneeProcessor(CdcProcessor): - def __init__(self) -> None: - postgres_table = "sentry_groupasignee" - super().__init__( - pg_table=postgres_table, - message_row_class=GroupAssigneeRow, - ) - - def _process_delete( - self, - offset: int, - key: Mapping[str, Any], - ) -> Sequence[WriterTableRow]: - key_names = key["keynames"] - key_values = key["keyvalues"] - project_id = key_values[key_names.index("project_id")] - group_id = key_values[key_names.index("group_id")] - return [ - GroupAssigneeRow( - offset=offset, - record_deleted=True, - project_id=project_id, - group_id=group_id, - record_content=None, - ).to_clickhouse() - ] diff --git a/snuba/datasets/cdc/row_processors.py b/snuba/datasets/cdc/row_processors.py index 9e7f4d0ddec..2f312eb859f 100644 --- a/snuba/datasets/cdc/row_processors.py +++ b/snuba/datasets/cdc/row_processors.py @@ -1,8 +1,6 @@ from abc import ABC, abstractmethod from typing import Type, cast -from snuba.datasets.cdc.groupassignee_processor import GroupAssigneeRow -from snuba.datasets.cdc.groupedmessage_processor import GroupedMessageRow from snuba.snapshots import SnapshotTableRow from snuba.utils.registered_class import RegisteredClass from snuba.writer import WriterTableRow @@ -24,13 +22,3 @@ def get_from_name(cls, name: str) -> Type["CdcRowProcessor"]: @classmethod def config_key(cls) -> str: return cls.__name__ - - -class GroupAssigneeRowProcessor(CdcRowProcessor): - def process(self, row: SnapshotTableRow) -> WriterTableRow: - return GroupAssigneeRow.from_bulk(row).to_clickhouse() - - -class GroupedMessageRowProcessor(CdcRowProcessor): - def process(self, row: SnapshotTableRow) -> WriterTableRow: - return GroupedMessageRow.from_bulk(row).to_clickhouse() diff --git a/snuba/datasets/configuration/groupassignee/dataset.yaml b/snuba/datasets/configuration/groupassignee/dataset.yaml deleted file mode 100644 index a1164a30ad7..00000000000 --- a/snuba/datasets/configuration/groupassignee/dataset.yaml +++ /dev/null @@ -1,6 +0,0 @@ -version: v1 -kind: dataset -name: groupassignee - -entities: - - groupassignee diff --git a/snuba/datasets/configuration/groupassignee/entities/groupassignee.yaml b/snuba/datasets/configuration/groupassignee/entities/groupassignee.yaml deleted file mode 100644 index 6222a9c2443..00000000000 --- a/snuba/datasets/configuration/groupassignee/entities/groupassignee.yaml +++ /dev/null @@ -1,33 +0,0 @@ -version: v1 -kind: entity -name: groupassignee - -schema: - [ - { name: offset, type: UInt, args: { size: 64 } }, - { name: record_deleted, type: UInt, args: { size: 8 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: group_id, type: UInt, args: { size: 64 } }, - { name: date_added, type: DateTime, args: { schema_modifiers: [nullable] } }, - { name: user_id, type: UInt, args: { size: 64, schema_modifiers: [nullable] } }, - { name: team_id, type: UInt, args: { size: 64, schema_modifiers: [nullable] } }, - ] - -storages: - - storage: groupassignees - is_writable: true -storage_selector: - selector: DefaultQueryStorageSelector - -query_processors: - - processor: BasicFunctionsProcessor -validate_data_model: error -validators: [] -required_time_column: null -join_relationships: - owns: - rhs_entity: events - join_type: left - columns: - - [project_id, project_id] - - [group_id, group_id] diff --git a/snuba/datasets/configuration/groupassignee/storages/group_assignees.yaml b/snuba/datasets/configuration/groupassignee/storages/group_assignees.yaml deleted file mode 100644 index e12d9d3f4ab..00000000000 --- a/snuba/datasets/configuration/groupassignee/storages/group_assignees.yaml +++ /dev/null @@ -1,67 +0,0 @@ -version: v1 -kind: cdc_storage -name: groupassignees -storage: - key: groupassignees - set_key: cdc -readiness_state: deprecate -schema: - columns: - [ - { name: offset, type: UInt, args: { size: 64 } }, - { name: record_deleted, type: UInt, args: { size: 8 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: group_id, type: UInt, args: { size: 64 } }, - { - name: date_added, - type: DateTime, - args: { schema_modifiers: [nullable] }, - }, - { - name: user_id, - type: UInt, - args: { schema_modifiers: [nullable], size: 64 }, - }, - { - name: team_id, - type: UInt, - args: { schema_modifiers: [nullable], size: 64 }, - }, - ] - local_table_name: groupassignee_local - dist_table_name: groupassignee_dist -default_control_topic: cdc_control -postgres_table: sentry_groupasignee -row_processor: - processor: GroupAssigneeRowProcessor -allocation_policies: - - name: ConcurrentRateLimitAllocationPolicy - args: - required_tenant_types: - - organization_id - - referrer - - project_id - default_config_overrides: - is_enforced: 0 - - name: BytesScannedWindowAllocationPolicy - args: - required_tenant_types: - - organization_id - - referrer - default_config_overrides: - is_enforced: 1 - throttled_thread_number: 1 - org_limit_bytes_scanned: 100000 -query_processors: - - processor: PrewhereProcessor - args: - prewhere_candidates: - - project_id - - processor: ConsistencyEnforcerProcessor -stream_loader: - processor: GroupAssigneeProcessor - default_topic: cdc - pre_filter: - type: CdcTableNameMessageFilter - args: - postgres_table: sentry_groupasignee diff --git a/snuba/datasets/configuration/groupedmessage/dataset.yaml b/snuba/datasets/configuration/groupedmessage/dataset.yaml deleted file mode 100644 index 4f49abe045b..00000000000 --- a/snuba/datasets/configuration/groupedmessage/dataset.yaml +++ /dev/null @@ -1,6 +0,0 @@ -version: v1 -kind: dataset -name: groupedmessage - -entities: - - groupedmessage diff --git a/snuba/datasets/configuration/groupedmessage/entities/groupedmessage.yaml b/snuba/datasets/configuration/groupedmessage/entities/groupedmessage.yaml deleted file mode 100644 index ae01cafd5e7..00000000000 --- a/snuba/datasets/configuration/groupedmessage/entities/groupedmessage.yaml +++ /dev/null @@ -1,35 +0,0 @@ -version: v1 -kind: entity -name: groupedmessage - -schema: - [ - { name: offset, type: UInt, args: { size: 64 } }, - { name: record_deleted, type: UInt, args: { size: 8 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: id, type: UInt, args: { size: 64 } }, - { name: status, type: UInt, args: { schema_modifiers: [nullable], size: 8 } }, - { name: last_seen, type: DateTime, args: { schema_modifiers: [nullable] } }, - { name: first_seen, type: DateTime, args: { schema_modifiers: [nullable] } }, - { name: active_at, type: DateTime, args: { schema_modifiers: [nullable] } }, - { name: first_release_id, type: UInt, args: { schema_modifiers: [nullable], size: 64 } }, - ] - -storages: - - storage: groupedmessages - is_writable: true -storage_selector: - selector: DefaultQueryStorageSelector - -query_processors: - - processor: BasicFunctionsProcessor -validate_data_model: error -validators: [] -required_time_column: null -join_relationships: - groups: - rhs_entity: events - join_type: left - columns: - - [project_id, project_id] - - [id, group_id] diff --git a/snuba/datasets/configuration/groupedmessage/storages/grouped_messages.yaml b/snuba/datasets/configuration/groupedmessage/storages/grouped_messages.yaml deleted file mode 100644 index 8c1f77b00aa..00000000000 --- a/snuba/datasets/configuration/groupedmessage/storages/grouped_messages.yaml +++ /dev/null @@ -1,86 +0,0 @@ -version: v1 -kind: cdc_storage -name: groupedmessages -storage: - key: groupedmessages - set_key: cdc -readiness_state: deprecate -schema: - columns: - [ - { name: offset, type: UInt, args: { size: 64 } }, - { name: record_deleted, type: UInt, args: { size: 8 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: id, type: UInt, args: { size: 64 } }, - { - name: status, - type: UInt, - args: { schema_modifiers: [nullable], size: 8 }, - }, - { - name: last_seen, - type: DateTime, - args: { schema_modifiers: [nullable] }, - }, - { - name: first_seen, - type: DateTime, - args: { schema_modifiers: [nullable] }, - }, - { - name: active_at, - type: DateTime, - args: { schema_modifiers: [nullable] }, - }, - { - name: first_release_id, - type: UInt, - args: { schema_modifiers: [nullable], size: 64 }, - }, - ] - local_table_name: groupedmessage_local - dist_table_name: groupedmessage_dist - not_deleted_mandatory_condition: record_deleted -default_control_topic: cdc_control -postgres_table: sentry_groupedmessage -row_processor: - processor: GroupedMessageRowProcessor -allocation_policies: - - name: ConcurrentRateLimitAllocationPolicy - args: - required_tenant_types: - - organization_id - - referrer - - project_id - default_config_overrides: - is_enforced: 0 - - name: BytesScannedWindowAllocationPolicy - args: - required_tenant_types: - - organization_id - - referrer - default_config_overrides: - is_enforced: 1 - throttled_thread_number: 1 - org_limit_bytes_scanned: 100000 - - name: ReferrerGuardRailPolicy - args: - required_tenant_types: - - referrer - default_config_overrides: - is_enforced: 0 - is_active: 0 -query_processors: - - processor: PrewhereProcessor - args: - prewhere_candidates: - - project_id - - id - - processor: ConsistencyEnforcerProcessor -stream_loader: - processor: GroupedMessageProcessor - default_topic: cdc - pre_filter: - type: CdcTableNameMessageFilter - args: - postgres_table: sentry_groupedmessage diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index c952b57c15c..e73a18ad8c7 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -97,7 +97,6 @@ "ca_certs": os.environ.get("CLICKHOUSE_CA_CERTS"), "verify": os.environ.get("CLICKHOUSE_VERIFY"), "storage_sets": { - "cdc", "discover", "events", "events_ro", diff --git a/snuba/snuba_migrations/events/0007_groupedmessages.py b/snuba/snuba_migrations/events/0007_groupedmessages.py index 2c0c9e3b0f0..c55a9f6faf8 100644 --- a/snuba/snuba_migrations/events/0007_groupedmessages.py +++ b/snuba/snuba_migrations/events/0007_groupedmessages.py @@ -1,8 +1,7 @@ from typing import Sequence from snuba.clickhouse.columns import Column, DateTime, UInt -from snuba.clusters.storage_sets import StorageSetKey -from snuba.migrations import migration, operations, table_engines +from snuba.migrations import migration, operations from snuba.migrations.columns import MigrationModifiers as Modifiers columns: Sequence[Column[Modifiers]] = [ @@ -23,51 +22,18 @@ Column("first_release_id", UInt(64, Modifiers(nullable=True))), ] - +# NOTE: CDC storage deprecated class Migration(migration.ClickhouseNodeMigrationLegacy): blocking = False def forwards_local(self) -> Sequence[operations.SqlOperation]: - return [ - operations.CreateTable( - storage_set=StorageSetKey.CDC, - table_name="groupedmessage_local", - columns=columns, - engine=table_engines.ReplacingMergeTree( - storage_set=StorageSetKey.CDC, - version_column="offset", - order_by="(project_id, id)", - sample_by="id", - unsharded=True, - ), - ) - ] + return [] def backwards_local(self) -> Sequence[operations.SqlOperation]: - return [ - operations.DropTable( - storage_set=StorageSetKey.CDC, - table_name="groupedmessage_local", - ) - ] + return [] def forwards_dist(self) -> Sequence[operations.SqlOperation]: - return [ - operations.CreateTable( - storage_set=StorageSetKey.CDC, - table_name="groupedmessage_dist", - columns=columns, - engine=table_engines.Distributed( - local_table_name="groupedmessage_local", - sharding_key=None, - ), - ) - ] + return [] def backwards_dist(self) -> Sequence[operations.SqlOperation]: - return [ - operations.DropTable( - storage_set=StorageSetKey.CDC, - table_name="groupedmessage_dist", - ) - ] + return [] diff --git a/snuba/snuba_migrations/events/0008_groupassignees.py b/snuba/snuba_migrations/events/0008_groupassignees.py index 7440db83df5..9e94a2149fc 100644 --- a/snuba/snuba_migrations/events/0008_groupassignees.py +++ b/snuba/snuba_migrations/events/0008_groupassignees.py @@ -1,8 +1,7 @@ from typing import Sequence from snuba.clickhouse.columns import Column, DateTime, UInt -from snuba.clusters.storage_sets import StorageSetKey -from snuba.migrations import migration, operations, table_engines +from snuba.migrations import migration, operations from snuba.migrations.columns import MigrationModifiers as Modifiers columns: Sequence[Column[Modifiers]] = [ @@ -17,50 +16,18 @@ Column("team_id", UInt(64, Modifiers(nullable=True))), ] - +# NOTE: CDC storage deprecated class Migration(migration.ClickhouseNodeMigrationLegacy): blocking = False def forwards_local(self) -> Sequence[operations.SqlOperation]: - return [ - operations.CreateTable( - storage_set=StorageSetKey.CDC, - table_name="groupassignee_local", - columns=columns, - engine=table_engines.ReplacingMergeTree( - storage_set=StorageSetKey.CDC, - version_column="offset", - order_by="(project_id, group_id)", - unsharded=True, - ), - ) - ] + return [] def backwards_local(self) -> Sequence[operations.SqlOperation]: - return [ - operations.DropTable( - storage_set=StorageSetKey.CDC, - table_name="groupassignee_local", - ) - ] + return [] def forwards_dist(self) -> Sequence[operations.SqlOperation]: - return [ - operations.CreateTable( - storage_set=StorageSetKey.CDC, - table_name="groupassignee_dist", - columns=columns, - engine=table_engines.Distributed( - local_table_name="groupassignee_local", - sharding_key=None, - ), - ) - ] + return [] def backwards_dist(self) -> Sequence[operations.SqlOperation]: - return [ - operations.DropTable( - storage_set=StorageSetKey.CDC, - table_name="groupassignee_dist", - ) - ] + return [] diff --git a/snuba/snuba_migrations/events/0010_groupedmessages_onpremise_compatibility.py b/snuba/snuba_migrations/events/0010_groupedmessages_onpremise_compatibility.py index 407df2316b4..9312d897d63 100644 --- a/snuba/snuba_migrations/events/0010_groupedmessages_onpremise_compatibility.py +++ b/snuba/snuba_migrations/events/0010_groupedmessages_onpremise_compatibility.py @@ -102,16 +102,7 @@ class Migration(migration.CodeMigration): blocking = True def forwards_global(self) -> Sequence[operations.RunPython]: - return [ - operations.RunPython( - func=fix_order_by, description="Sync project ID colum for onpremise" - ), - ] + return [] def backwards_global(self) -> Sequence[operations.RunPython]: - return [ - operations.RunPython( - func=ensure_drop_temporary_tables, - description="Ensure temporary tables created by the migration are dropped", - ) - ] + return [] diff --git a/tests/datasets/cdc/__init__.py b/tests/datasets/cdc/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/tests/datasets/cdc/test_groupassignee.py b/tests/datasets/cdc/test_groupassignee.py deleted file mode 100644 index 68422c36a4a..00000000000 --- a/tests/datasets/cdc/test_groupassignee.py +++ /dev/null @@ -1,238 +0,0 @@ -from datetime import datetime, timezone - -import pytest - -from snuba.clusters.cluster import ClickhouseClientSettings -from snuba.consumers.types import KafkaMessageMetadata -from snuba.datasets.cdc.groupassignee_processor import ( - GroupAssigneeProcessor, - GroupAssigneeRow, -) -from snuba.datasets.cdc.types import DeleteEvent, InsertEvent, UpdateEvent -from snuba.datasets.storages.factory import get_writable_storage -from snuba.datasets.storages.storage_key import StorageKey -from snuba.processor import InsertBatch -from tests.helpers import write_processed_messages - - -@pytest.mark.clickhouse_db -@pytest.mark.redis_db -class TestGroupassignee: - storage = get_writable_storage(StorageKey.GROUPASSIGNEES) - - UPDATE_MSG_NO_KEY_CHANGE = UpdateEvent( - { - "columnnames": [ - "id", - "project_id", - "group_id", - "user_id", - "date_added", - "team_id", - ], - "columntypes": [ - "bigint", - "bigint", - "bigint", - "integer", - "timestamp with time zone", - "bigint", - ], - "columnvalues": [35, 2, 1359, 1, "2019-09-19 00:17:55+00", None], - "event": "change", - "kind": "update", - "oldkeys": { - "keynames": ["project_id", "group_id"], - "keytypes": ["bigint", "bigint"], - "keyvalues": [2, 1359], - }, - "schema": "public", - "table": "sentry_groupasignee", - "timestamp": "2019-09-19 00:06:56.376853+00", - "xid": 3803891, - } - ) - UPDATE_MSG_WITH_KEY_CHANGE = UpdateEvent( - { - "columnnames": [ - "id", - "project_id", - "group_id", - "user_id", - "date_added", - "team_id", - ], - "columntypes": [ - "bigint", - "bigint", - "bigint", - "integer", - "timestamp with time zone", - "bigint", - ], - "columnvalues": [35, 3, 1359, 1, "2019-09-19 00:17:55+00", None], - "event": "change", - "kind": "update", - "oldkeys": { - "keynames": ["project_id", "group_id"], - "keytypes": ["bigint", "bigint"], - "keyvalues": [2, 1359], - }, - "schema": "public", - "table": "sentry_groupasignee", - "timestamp": "2019-09-19 00:06:56.376853+00", - "xid": 3803891, - } - ) - - DELETE_MSG = DeleteEvent( - { - "event": "change", - "kind": "delete", - "oldkeys": { - "keynames": ["project_id", "group_id"], - "keytypes": ["bigint", "bigint"], - "keyvalues": [2, 1359], - }, - "schema": "public", - "table": "sentry_groupasignee", - "timestamp": "2019-09-19 00:17:21.44787+00", - "xid": 3803954, - } - ) - - INSERT_MSG = InsertEvent( - { - "columnnames": [ - "id", - "project_id", - "group_id", - "user_id", - "date_added", - "team_id", - ], - "columntypes": [ - "bigint", - "bigint", - "bigint", - "integer", - "timestamp with time zone", - "bigint", - ], - "columnvalues": [35, 2, 1359, 1, "2019-09-19 00:17:55+00", None], - "event": "change", - "kind": "insert", - "schema": "public", - "table": "sentry_groupasignee", - "timestamp": "2019-09-19 00:17:55.032443+00", - "xid": 3803982, - } - ) - - PROCESSED = { - "offset": 42, - "project_id": 2, - "group_id": 1359, - "record_deleted": 0, - "user_id": 1, - "team_id": None, - "date_added": datetime(2019, 9, 19, 0, 17, 55, tzinfo=timezone.utc), - } - - PROCESSED_UPDATE = { - "offset": 42, - "project_id": 3, - "group_id": 1359, - "record_deleted": 0, - "user_id": 1, - "team_id": None, - "date_added": datetime(2019, 9, 19, 0, 17, 55, tzinfo=timezone.utc), - } - - DELETED = { - "offset": 42, - "project_id": 2, - "group_id": 1359, - "record_deleted": 1, - "user_id": None, - "team_id": None, - "date_added": None, - } - - def test_messages(self) -> None: - processor = GroupAssigneeProcessor() - - metadata = KafkaMessageMetadata( - offset=42, partition=0, timestamp=datetime(1970, 1, 1) - ) - - ret = processor.process_message(self.INSERT_MSG, metadata) - assert ret == InsertBatch( - [self.PROCESSED], - datetime(2019, 9, 19, 0, 17, 55, 32443, tzinfo=timezone.utc), - ) - write_processed_messages(self.storage, [ret]) - results = ( - self.storage.get_cluster() - .get_query_connection(ClickhouseClientSettings.QUERY) - .execute("SELECT * FROM groupassignee_local;") - .results - ) - assert results[0] == ( - 42, # offset - 0, # deleted - 2, # project_id - 1359, # group_id - datetime(2019, 9, 19, 0, 17, 55), - 1, # user_id - None, # team_id - ) - - ret = processor.process_message(self.UPDATE_MSG_NO_KEY_CHANGE, metadata) - assert ret == InsertBatch( - [self.PROCESSED], - datetime(2019, 9, 19, 0, 6, 56, 376853, tzinfo=timezone.utc), - ) - - # Tests an update with key change which becomes a two inserts: - # one deletion and the insertion of the new row. - ret = processor.process_message(self.UPDATE_MSG_WITH_KEY_CHANGE, metadata) - assert ret == InsertBatch( - [self.DELETED, self.PROCESSED_UPDATE], - datetime(2019, 9, 19, 0, 6, 56, 376853, tzinfo=timezone.utc), - ) - - ret = processor.process_message(self.DELETE_MSG, metadata) - assert ret == InsertBatch( - [self.DELETED], - datetime(2019, 9, 19, 0, 17, 21, 447870, tzinfo=timezone.utc), - ) - - def test_bulk_load(self) -> None: - row = GroupAssigneeRow.from_bulk( - { - "project_id": "2", - "group_id": "1359", - "date_added": "2019-09-19 00:17:55+00", - "user_id": "1", - "team_id": "", - } - ) - write_processed_messages( - self.storage, [InsertBatch([row.to_clickhouse()], None)] - ) - ret = ( - self.storage.get_cluster() - .get_query_connection(ClickhouseClientSettings.QUERY) - .execute("SELECT * FROM groupassignee_local;") - .results - ) - assert ret[0] == ( - 0, # offset - 0, # deleted - 2, # project_id - 1359, # group_id - datetime(2019, 9, 19, 0, 17, 55), - 1, # user_id - None, # team_id - ) diff --git a/tests/datasets/cdc/test_groupedmessage.py b/tests/datasets/cdc/test_groupedmessage.py deleted file mode 100644 index 97c16052feb..00000000000 --- a/tests/datasets/cdc/test_groupedmessage.py +++ /dev/null @@ -1,301 +0,0 @@ -from datetime import datetime, timezone - -import pytest - -from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster -from snuba.clusters.storage_sets import StorageSetKey -from snuba.consumers.types import KafkaMessageMetadata -from snuba.datasets.cdc.groupedmessage_processor import ( - GroupedMessageProcessor, - GroupedMessageRow, -) -from snuba.datasets.cdc.types import DeleteEvent, InsertEvent, UpdateEvent -from snuba.datasets.storages.factory import get_writable_storage -from snuba.datasets.storages.storage_key import StorageKey -from snuba.processor import InsertBatch -from tests.helpers import write_processed_messages - - -@pytest.mark.clickhouse_db -@pytest.mark.redis_db -class TestGroupedMessage: - storage = get_writable_storage(StorageKey.GROUPEDMESSAGES) - - UPDATE_MSG = UpdateEvent( - { - "columnnames": [ - "id", - "logger", - "level", - "message", - "view", - "status", - "times_seen", - "last_seen", - "first_seen", - "data", - "score", - "project_id", - "time_spent_total", - "time_spent_count", - "resolved_at", - "active_at", - "is_public", - "platform", - "num_comments", - "first_release_id", - "short_id", - ], - "columntypes": [ - "bigint", - "character varying(64)", - "integer", - "text", - "character varying(200)", - "integer", - "integer", - "timestamp with time zone", - "timestamp with time zone", - "text", - "integer", - "bigint", - "integer", - "integer", - "timestamp with time zone", - "timestamp with time zone", - "boolean", - "character varying(64)", - "integer", - "bigint", - "bigint", - ], - "columnvalues": [ - 74, - "", - 40, - " ZeroDivisionError integer division or modulo by " - "zero client3.py __main__ in ", - "__main__ in ", - 0, - 2, - "2019-06-19 06:46:28+00", - "2019-06-19 06:45:32+00", - "eJyT7tuwzAM3PkV2pzJiO34VRSdmvxAgA5dCtViDAGyJEi0AffrSxrZOlSTjrzj3Z1MrOBekCWHBcQaPj4xhXe72WyDv6YU0ouynnDGpMxzrEJSSzCrC+p7Vz8sgNhAvhdOZ/pKOKHd0PC5C9yqtjuPddcPQ9n0w8hPiLRHsWvZGsWD/91xIya2IFxz7vJWfTUlHHnwSCEBUkbTZrxCCcOf2baY/XTU1VJm9cjHL4JriHPYvOnliyP0Jt2q4SpLkz7v6owW9E9rEOvl0PawczxcvkLIWppxg==", - 1560926969, - 2, - 0, - 0, - None, - "2019-06-19 06:45:32+00", - False, - "python", - 0, - None, - 20, - ], - "event": "change", - "kind": "update", - "oldkeys": {"keynames": ["id"], "keytypes": ["bigint"], "keyvalues": [74]}, - "schema": "public", - "table": "sentry_groupedmessage", - "timestamp": "2019-09-19 00:17:21.44787+00", - "xid": 2380866, - } - ) - - DELETE_MSG = DeleteEvent( - { - "event": "change", - "kind": "delete", - "oldkeys": { - "keynames": ["id", "project_id"], - "keytypes": ["bigint", "bigint"], - "keyvalues": [74, 2], - }, - "schema": "public", - "table": "sentry_groupedmessage", - "timestamp": "2019-09-19 00:17:21.44787+00", - "xid": 2380866, - } - ) - - INSERT_MSG = InsertEvent( - { - "columnnames": [ - "id", - "logger", - "level", - "message", - "view", - "status", - "times_seen", - "last_seen", - "first_seen", - "data", - "score", - "project_id", - "time_spent_total", - "time_spent_count", - "resolved_at", - "active_at", - "is_public", - "platform", - "num_comments", - "first_release_id", - "short_id", - ], - "columntypes": [ - "bigint", - "character varying(64)", - "integer", - "text", - "character varying(200)", - "integer", - "integer", - "timestamp with time zone", - "timestamp with time zone", - "text", - "integer", - "bigint", - "integer", - "integer", - "timestamp with time zone", - "timestamp with time zone", - "boolean", - "character varying(64)", - "integer", - "bigint", - "bigint", - ], - "columnvalues": [ - 74, - "", - 40, - " ZeroDivisionError integer division or modulo by " - "zero client3.py __main__ in ", - "__main__ in ", - 0, - 2, - "2019-06-19 06:46:28+00", - "2019-06-19 06:45:32+00", - "eJyT7tuwzAM3PkV2pzJiO34VRSdmvxAgA5dCtViDAGyJEi0AffrSxrZOlSTjrzj3Z1MrOBekCWHBcQaPj4xhXe72WyDv6YU0ouynnDGpMxzrEJSSzCrC+p7Vz8sgNhAvhdOZ/pKOKHd0PC5C9yqtjuPddcPQ9n0w8hPiLRHsWvZGsWD/91xIya2IFxz7vJWfTUlHHnwSCEBUkbTZrxCCcOf2baY/XTU1VJm9cjHL4JriHPYvOnliyP0Jt2q4SpLkz7v6owW9E9rEOvl0PawczxcvkLIWppxg==", - 1560926969, - 2, - 0, - 0, - None, - "2019-06-19 06:45:32+00", - False, - "python", - 0, - None, - 20, - ], - "event": "change", - "kind": "insert", - "schema": "public", - "table": "sentry_groupedmessage", - "timestamp": "2019-09-19 00:17:21.44787+00", - "xid": 2380866, - } - ) - - PROCESSED = { - "offset": 42, - "project_id": 2, - "id": 74, - "record_deleted": 0, - "status": 0, - "last_seen": datetime(2019, 6, 19, 6, 46, 28, tzinfo=timezone.utc), - "first_seen": datetime(2019, 6, 19, 6, 45, 32, tzinfo=timezone.utc), - "active_at": datetime(2019, 6, 19, 6, 45, 32, tzinfo=timezone.utc), - "first_release_id": None, - } - - DELETED = { - "offset": 42, - "project_id": 2, - "id": 74, - "record_deleted": 1, - "status": None, - "last_seen": None, - "first_seen": None, - "active_at": None, - "first_release_id": None, - } - - def test_messages(self) -> None: - processor = GroupedMessageProcessor() - - metadata = KafkaMessageMetadata( - offset=42, partition=0, timestamp=datetime(1970, 1, 1) - ) - - ret = processor.process_message(self.INSERT_MSG, metadata) - assert ret == InsertBatch( - [self.PROCESSED], - datetime(2019, 9, 19, 0, 17, 21, 447870, tzinfo=timezone.utc), - ) - write_processed_messages(self.storage, [ret]) - results = ( - get_cluster(StorageSetKey.EVENTS) - .get_query_connection(ClickhouseClientSettings.INSERT) - .execute("SELECT * FROM groupedmessage_local;") - .results - ) - assert results[0] == ( - 42, # offset - 0, # deleted - 2, # project_id - 74, # id - 0, # status - datetime(2019, 6, 19, 6, 46, 28), - datetime(2019, 6, 19, 6, 45, 32), - datetime(2019, 6, 19, 6, 45, 32), - None, - ) - - ret = processor.process_message(self.UPDATE_MSG, metadata) - assert ret == InsertBatch( - [self.PROCESSED], - datetime(2019, 9, 19, 0, 17, 21, 447870, tzinfo=timezone.utc), - ) - - ret = processor.process_message(self.DELETE_MSG, metadata) - assert ret == InsertBatch( - [self.DELETED], - datetime(2019, 9, 19, 0, 17, 21, 447870, tzinfo=timezone.utc), - ) - - def test_bulk_load(self) -> None: - row = GroupedMessageRow.from_bulk( - { - "project_id": "2", - "id": "10", - "status": "0", - "last_seen": "2019-06-28 17:57:32+00", - "first_seen": "2019-06-28 06:40:17+00", - "active_at": "2019-06-28 06:40:17+00", - "first_release_id": "26", - } - ) - write_processed_messages( - self.storage, [InsertBatch([row.to_clickhouse()], None)] - ) - ret = ( - get_cluster(StorageSetKey.EVENTS) - .get_query_connection(ClickhouseClientSettings.QUERY) - .execute("SELECT * FROM groupedmessage_local;") - .results - ) - assert ret[0] == ( - 0, # offset - 0, # deleted - 2, # project_id - 10, # id - 0, # status - datetime(2019, 6, 28, 17, 57, 32), - datetime(2019, 6, 28, 6, 40, 17), - datetime(2019, 6, 28, 6, 40, 17), - 26, - ) diff --git a/tests/datasets/cdc/test_message_filters.py b/tests/datasets/cdc/test_message_filters.py deleted file mode 100644 index f7d25263627..00000000000 --- a/tests/datasets/cdc/test_message_filters.py +++ /dev/null @@ -1,47 +0,0 @@ -from datetime import datetime - -from arroyo.backends.kafka import KafkaPayload -from arroyo.types import BrokerValue, Message, Partition, Topic - -from snuba.datasets.message_filters import CdcTableNameMessageFilter - - -def test_table_name_filter() -> None: - table_name = "table_name" - message_filter = CdcTableNameMessageFilter(table_name) - - # Messages that math the table should not be dropped. - assert not message_filter.should_drop( - Message( - BrokerValue( - KafkaPayload(None, b"", [("table", table_name.encode("utf8"))]), - Partition(Topic("topic"), 0), - 0, - datetime.now(), - ) - ) - ) - - # Messages without a table should be dropped. - assert message_filter.should_drop( - Message( - BrokerValue( - KafkaPayload(None, b"", []), - Partition(Topic("topic"), 0), - 0, - datetime.now(), - ) - ) - ) - - # Messages from a different table should be dropped. - assert message_filter.should_drop( - Message( - BrokerValue( - KafkaPayload(None, b"", [("table", b"other_table")]), - Partition(Topic("topic"), 0), - 0, - datetime.now(), - ) - ) - ) diff --git a/tests/datasets/storages/test_storage_factory.py b/tests/datasets/storages/test_storage_factory.py index 2de2a91d87b..d538e58677c 100644 --- a/tests/datasets/storages/test_storage_factory.py +++ b/tests/datasets/storages/test_storage_factory.py @@ -13,8 +13,6 @@ StorageKey.DISCOVER, StorageKey.ERRORS, StorageKey.ERRORS_RO, - StorageKey.GROUPEDMESSAGES, - StorageKey.GROUPASSIGNEES, StorageKey.METRICS_COUNTERS, StorageKey.ORG_METRICS_COUNTERS, StorageKey.METRICS_DISTRIBUTIONS, diff --git a/tests/migrations/test_runner_individual.py b/tests/migrations/test_runner_individual.py index 18e673f70b4..4fa8138a082 100644 --- a/tests/migrations/test_runner_individual.py +++ b/tests/migrations/test_runner_individual.py @@ -11,7 +11,7 @@ from snuba.consumers.types import KafkaMessageMetadata from snuba.datasets.storages.factory import get_writable_storage from snuba.datasets.storages.storage_key import StorageKey -from snuba.migrations.groups import MigrationGroup, get_group_loader +from snuba.migrations.groups import MigrationGroup from snuba.migrations.runner import MigrationKey, Runner from snuba.migrations.status import Status from snuba.processor import InsertBatch @@ -146,59 +146,6 @@ def generate_transactions() -> None: ).write(rows) -@pytest.mark.clickhouse_db -def test_groupedmessages_compatibility() -> None: - cluster = get_cluster(StorageSetKey.EVENTS) - - # Ignore the multi node mode because this tests a migration - # for an older table state that only applied to single node - if not cluster.is_single_node(): - return - - database = cluster.get_database() - connection = cluster.get_query_connection(ClickhouseClientSettings.MIGRATE) - - # Create old style table witihout project ID - connection.execute( - """ - CREATE TABLE groupedmessage_local (`offset` UInt64, `record_deleted` UInt8, - `id` UInt64, `status` Nullable(UInt8), `last_seen` Nullable(DateTime), - `first_seen` Nullable(DateTime), `active_at` Nullable(DateTime), - `first_release_id` Nullable(UInt64)) ENGINE = ReplacingMergeTree(offset) - ORDER BY id SAMPLE BY id SETTINGS index_granularity = 8192 - """ - ) - - migration_id = "0010_groupedmessages_onpremise_compatibility" - - runner = Runner() - runner.run_migration( - MigrationKey(MigrationGroup.SYSTEM, "0001_migrations"), force=True - ) - events_migrations = get_group_loader(MigrationGroup.EVENTS).get_migrations() - - # Mark prior migrations complete - for migration in events_migrations[: (events_migrations.index(migration_id))]: - runner._update_migration_status( - MigrationKey(MigrationGroup.EVENTS, migration), Status.COMPLETED - ) - - runner.run_migration( - MigrationKey(MigrationGroup.EVENTS, migration_id), - force=True, - ) - - outcome = perform_select_query( - ["primary_key"], - "system.tables", - {"name": "groupedmessage_local", "database": str(database)}, - None, - connection, - ) - - assert outcome == [("project_id, id",)] - - @pytest.mark.clickhouse_db def run_prior_migrations( migration_group: MigrationGroup, stop_migration_id: str, runner: Runner diff --git a/tests/snapshots/test_postgres_snapshot.py b/tests/snapshots/test_postgres_snapshot.py deleted file mode 100644 index 627aa2b5f82..00000000000 --- a/tests/snapshots/test_postgres_snapshot.py +++ /dev/null @@ -1,122 +0,0 @@ -from pathlib import Path - -import pytest - -from snuba.snapshots import ( # NOQA - ColumnConfig, - DateFormatPrecision, - DateTimeFormatterConfig, -) -from snuba.snapshots.postgres_snapshot import PostgresSnapshot - -META_FILE = """ -{ - "snapshot_id": "50a86ad6-b4b7-11e9-a46f-acde48001122", - "product": "snuba", - "transactions": { - "xmin": 3372750, - "xmax": 3372754, - "xip_list": [] - }, - "content": [ - { - "table": "sentry_groupedmessage", - "zip": false, - "columns": [ - {"name": "id"}, - {"name": "status"} - ] - }, - { - "table": "sentry_groupasignee", - "zip": true, - "columns": [ - {"name": "id"}, - { - "name": "a_date", - "formatter": { - "type": "datetime", - "precision": "second" - } - } - ] - } - ], - "start_timestamp": 1564703503.682226 -} -""" - - -class TestPostgresSnapshot: - def __prepare_directory(self, tmp_path: Path, table_content: str) -> str: - snapshot_base = tmp_path / "cdc-snapshot" - snapshot_base.mkdir() - meta = snapshot_base / "metadata.json" - meta.write_text(META_FILE) - tables_dir = tmp_path / "cdc-snapshot" / "tables" - tables_dir.mkdir() - groupedmessage = tables_dir / "sentry_groupedmessage.csv" - groupedmessage.write_text(table_content) - groupassignee = tables_dir / "sentry_groupasignee" - groupassignee.write_text( - """id,project_id -""" - ) - return str(snapshot_base) - - def test_parse_snapshot(self, tmp_path: Path) -> None: - snapshot_base = self.__prepare_directory( - tmp_path, - """id,status -0,1 -""", - ) - snapshot = PostgresSnapshot.load("snuba", snapshot_base) - descriptor = snapshot.get_descriptor() - assert descriptor.id == "50a86ad6-b4b7-11e9-a46f-acde48001122" - assert descriptor.xmax == 3372754 - assert descriptor.xmin == 3372750 - assert descriptor.xip_list == [] - tables = { - table_config.table: (table_config.columns, table_config.zip) - for table_config in descriptor.tables - } - assert "sentry_groupedmessage" in tables - assert tables["sentry_groupedmessage"] == ( - [ColumnConfig("id"), ColumnConfig("status")], - False, - ) - assert "sentry_groupasignee" in tables - assert tables["sentry_groupasignee"] == ( - [ - ColumnConfig("id"), - ColumnConfig( - "a_date", - formatter=DateTimeFormatterConfig( - precision=DateFormatPrecision.SECOND - ), - ), - ], - True, - ) - - with snapshot.get_parsed_table_file("sentry_groupedmessage") as table: - assert next(table) == { - "id": "0", - "status": "1", - } - - with snapshot.get_preprocessed_table_file("sentry_groupedmessage") as table: - assert next(table) == b"id,status\n0,1\n" - - def test_parse_invalid_snapshot(self, tmp_path: Path) -> None: - snapshot_base = self.__prepare_directory( - tmp_path, - """id -0 -""", - ) - with pytest.raises(ValueError, match=".+sentry_groupedmessage.+status.+"): - snapshot = PostgresSnapshot.load("snuba", snapshot_base) - with snapshot.get_parsed_table_file("sentry_groupedmessage") as table: - next(table)