-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsql_queries
More file actions
74 lines (65 loc) · 1.84 KB
/
sql_queries
File metadata and controls
74 lines (65 loc) · 1.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
--Creates table to store raw data
%flink.ssql
CREATE TABLE raw_data_table (
user_id INT,
postcode VARCHAR(4),
webpage VARCHAR(255),
event_timestamp TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '5' SECOND
)
PARTITIONED BY (user_id)
WITH (
'connector' = 'kinesis',
'stream' = 'raw_data_stream',
'aws.region' = 'eu-north-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
------------------------------------------
--Query to visualise raw data
%flink.ssql
SELECT * FROM raw_data_table
------------------------------------------
--Query to perform aggregations
%flink.ssql(type=update)
SELECT
postcode,
TUMBLE_START(event_timestamp, INTERVAL '1' MINUTE) AS start_time,
TUMBLE_END(event_timestamp, INTERVAL '1' MINUTE) AS end_time,
COUNT(*) AS pageviews
FROM
raw_data_table
GROUP BY
postcode,
TUMBLE(event_timestamp, INTERVAL '1' MINUTE);
------------------------------------------
--Create table to store aggregations
%flink.ssql
CREATE TABLE aggregated_data_table (
postcode VARCHAR(6),
start_time TIMESTAMP(3),
end_time TIMESTAMP(3),
pageviews INT
)
WITH (
'connector' = 'kinesis',
'stream' = 'aggregated_data_stream',
'aws.region' = 'eu-north-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');
------------------------------------------
--Insert aggregated data into new table
%flink.ssql(type=update)
INSERT INTO aggregated_data_table
SELECT
postcode,
TUMBLE_START(event_timestamp, INTERVAL '1' MINUTE) AS start_time,
TUMBLE_END(event_timestamp, INTERVAL '1' MINUTE) AS end_time,
COUNT(*) AS pageviews
FROM
raw_data_table
GROUP BY
postcode,
TUMBLE(event_timestamp, INTERVAL '1' MINUTE);