-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapplication.py
More file actions
92 lines (69 loc) · 2.17 KB
/
Copy pathapplication.py
File metadata and controls
92 lines (69 loc) · 2.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from flask import Flask
from flask import jsonify
import boto3
import json
import decimal
from boto3.dynamodb.conditions import Key, Attr
import logging
application = Flask(__name__)
logging.basicConfig(level=logging.INFO)
# Helper class to convert a DynamoDB item to JSON.
class DecimalEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
if o % 1 > 0:
return float(o)
else:
return int(o)
return super(DecimalEncoder, self).default(o)
application.json_encoder = DecimalEncoder
dynamodb = boto3.resource(service_name='dynamodb',
region_name='us-east-1',
# endpoint_url="http://localhost:8000"
)
table = dynamodb.Table('Movies')
@application.route('/')
def api_intro():
intro = \
"""
<h2> Welcome to the DataNorth API! </h2>
<h4> The following endpoints are available: </h4>
<ul>
<li>/movies/year</li>
</ul>
"""
return intro
@application.route('/movies/')
def movies_info():
endpoints = ['year']
return jsonify({'endpoints': endpoints})
@application.route('/movies/<year>/')
def movies(year):
""" Sample movies endpoint. """
fe = Key('year').eq(int(year));
pe = "#yr, title, info.rating"
# Expression Attribute Names for Projection Expression only.
ean = { "#yr": "year", }
esk = None
response = table.scan(
FilterExpression=fe,
ProjectionExpression=pe,
ExpressionAttributeNames=ean
)
results = [i for i in response['Items']]
# for i in response['Items']:
# print(json.dumps(i, cls=DecimalEncoder))
while 'LastEvaluatedKey' in response:
response = table.scan(
ProjectionExpression=pe,
FilterExpression=fe,
ExpressionAttributeNames= ean,
ExclusiveStartKey=response['LastEvaluatedKey']
)
for i in response['Items']:
# print(json.dumps(i, cls=DecimalEncoder))
results.append(i)
return jsonify(items=results)
if __name__ == "__main__":
application.debug = True
application.run()