-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathcheckHistoryData.py
More file actions
105 lines (82 loc) · 4.06 KB
/
checkHistoryData.py
File metadata and controls
105 lines (82 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# -*- coding: utf-8 -*-
# @Time : 2019-02-14 14:09
# @Author : Dingzh.tobest
# 文件描述 :按合约校验本地数据,并补全缺失的数据
from __future__ import print_function
import sys
import json
from datetime import datetime
from time import time, sleep
from pymongo import MongoClient, ASCENDING
from vnpy.trader.vtObject import VtBarData
from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME, DAILY_DB_NAME
import jqdatasdk
# 加载配置
config = open('config.json')
setting = json.load(config)
MONGO_HOST = setting['MONGO_HOST']
MONGO_PORT = setting['MONGO_PORT']
JQDATA_USER = setting['JQDATA_USER']
JQDATA_PASSWORD = setting['JQDATA_PASSWORD']
mc = MongoClient(MONGO_HOST, MONGO_PORT) # Mongo连接
minute_db = mc[MINUTE_DB_NAME] # 分钟数据库
daily_db = mc[DAILY_DB_NAME] # 日线数据库
def checkHistoryData(symbols_list, start_date, end_date=datetime.today().date()):
jqdatasdk.auth(JQDATA_USER, JQDATA_PASSWORD)
# 获取需要校验的合约的信息
symbols_df = jqdatasdk.get_all_securities(types=['futures'])
symbols_df = symbols_df[symbols_df['name'].isin(symbols_list)]
mc = MongoClient(MONGO_HOST, MONGO_PORT) # Mongo连接
minute_db = mc[MINUTE_DB_NAME] # 分钟数据库
daily_db = mc[DAILY_DB_NAME] # 日线数据库
err_str = ''
# 按日校验合约的日线和分钟线数据
for symbol_index, symbol_row in symbols_df.iterrows():
vt_symbol = symbol_row['name']
print('start==>' + vt_symbol)
prices_df = jqdatasdk.get_price(symbol_index, start_date=start_date, end_date=end_date, frequency='daily',
fields=['open', 'close', 'high', 'low'])
prices_df = prices_df.dropna()
daily_error_list = []
minute_error_list = []
last_count = 0
prices_df['next_trade_day'] = prices_df.index
prices_df['next_trade_day'] = prices_df['next_trade_day'].shift(-1)
symbol_daily_db = daily_db[vt_symbol]
symbol_minute_db = minute_db[vt_symbol]
for index, row in prices_df.iterrows():
date = str(index)[:10].replace('-', '')
print('开始校验数据:' + date)
# 日线数据校验
# print("校验日线数据")
daybar_count = symbol_daily_db.find({"date": date}).count()
if daybar_count != 1:
# print('日线数据错误:' + date + '当日数据量不符==>' + str(daybar_count))
daily_error_list.append(date)
# 分钟线数据校验
# print("校验分钟线数据")
day_count = symbol_minute_db.find({"date": date}).count()
if day_count == 0:
# print('分钟线数据错误:' + date + '当日数据量为0')
minute_error_list.append(date)
continue
elif day_count != last_count:
df = jqdatasdk.get_price(symbol_index, start_date=str(index)[:10], end_date=row['next_trade_day'],
frequency='minute', fields=['close'])
if len(df) != day_count:
# print('分钟线数据错误:' + date + '当日数据量不符==>' + str(day_count) + ', 实际数量==>' + str(len(df)))
minute_error_list.append(date)
continue
last_count = day_count
if len(daily_error_list) != 0 or len(minute_error_list) != 0:
err_fw = open(vt_symbol + '.error', 'w')
err_fw.write('日线错误\r\n')
err_fw.write('\r\n'.join(daily_error_list))
err_fw.write('\r\n分钟线错误\r\n')
err_fw.write('\r\n'.join(minute_error_list))
err_fw.flush()
err_fw.close()
err_str = err_str + vt_symbol + ', error_info : ' + str(len(daily_error_list)) + ', minute_err_info : ' + str(len(minute_error_list)) + '\r\n'
print(err_str)
if __name__ == '__main__':
checkHistoryData(['I8888', 'RB8888'], '2018-01-01', '2019-02-13')