本教程将介绍如何把Tushare的沪深股票2008年到2017年的日线行情数据和每日指标数据导入到DolphinDB,并使用DolphinDB进行金融分析。Tushare是金融大数据开放社区,拥有丰富的金融数据,如股票、基金、期货、数字货币等行情数据,为量化从业人员和金融相关研究人员免费提供金融数据。
1. 数据概况
Tushare提供的沪深股票日线行情数据包含以下字段:
名称 | 描述 |
ts_code | 股票代码 |
trade_date | 交易日期 |
open | 开盘价 |
high | 最高价 |
low | 最低价 |
close | 收盘价 |
pre_close | 昨收价 |
change | 涨跌额 |
pct_change | 涨跌幅 |
vol | 成交量(手) |
amount | 成交额(千元) |
每日指标数据包含以下字段:
名称 | 描述 |
ts_code | 股票代码 |
trade_date | 交易日期 |
close | 收盘价 |
turnover_rate | 换手率 |
turnover_rate_f | 换手率(自由流通股) |
volume_ration | 量比 |
pe | 市盈率(总市值/净利润) |
pe_ttm | 市盈率(TTM) |
pb | 市净率(总市值/净资产) |
ps | 市销率 |
ps_ttm | 市销率(TTM) |
total_share | 总股本(万) |
float_share | 流通股本(万) |
free_share | 自由流通股本(万) |
total_mv | 总市值(万元) |
cric_mv | 流通市值(万元) |
2. 创建DolphinDB数据库
2.1 安装DolphinDB
从官网下载DolphinDB安装包和DolphinDB GUI。
DolphinDB单节点部署请参考教程单节点部署。
DolphinDB单服务器集群部署请参考单服务器集群部署。
DolphinDB多物理服务器部署请参考多服务器集群部署。
2.2 创建数据库
我们可以使用database函数创建分区数据库。
语法:database(directory, [partitionType], [partitionScheme], [locations])
参数:
directory:数据库保存的目录。DolphinDB有三种类型的数据库,分别是内存数据库、磁盘上的数据库和分布式文件系统上的数据库。创建内存数据库,directory为空;创建本地数据库,directory应该是本地文件系统目录;创建分布式文件系统上的数据库,directory应该以“dfs://”开头。本教程使用分布式文件系统上的数据库。
partitionType:分区方式,有6种方式: 顺序分区(SEQ),范围分区(RANGE),哈希分区(HASH),值分区(VALUE),列表分区(LIST),复合分区(COMPO)。
partitionScheme:分区方案。各种分区方式对应的分区方案如下:
导入数据前,要做好数据的分区规划,主要考虑两个因素:分区字段和分区粒度。
在日常的查询分析中,按照日期查询的频率最高,所以分区字段为日期trade_date。如果一天一个分区,每个分区的数据量过少,只有3000多条数据,不到1兆大小,而且分区数量非常多。分布式系统在执行查询时,会把查询语句分成多个子任务发送到不同的分区。这样的分区方式会导致子任务数量非常多,而每个任务执行的时间极短,系统在管理任务上耗费的时间反而大于任务本身的执行时间,明显这样的分区方式是不合理。这种情况下,我们按日期范围进行分区,每年的1月1日到次年的1月1日为一个分区,这样既能提升查询的效率,也不会造成分区粒度过小。
现有数据的时间跨度是2008-2017年,但是为了给未来的数据留出足够的空间,我们把时间范围设置为2008-2030年。执行以下代码:
- yearRange=date(2008.01M + 12*0..22)
- login("admin","123456")
- dbPath="dfs://tushare"
- yearRange=date(2008.01M + 12*0..22)
- if(existsDatabase(dbPath)){
- dropDatabase(dbPath)
- }
- columns1=`ts_code`trade_date`open`high`low`close`pre_close`change`pct_change`vol`amount
- type1=`SYMBOL`NANOTIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
- db=database(dbPath,RANGE,yearRange)
- hushen_daily_line=db.createPartitionedTable(table(100000000:0,columns1,type1),`hushen_daily_line,`trade_date)
- columns2=`ts_code`trade_date`close`turnover_rate`turnover_rate_f`volume_ratio`pe`pr_ttm`pb`ps`ps_ttm`total_share`float_share`free_share`total_mv`circ_mv
- type2=`SYMBOL`NANOTIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
- hushen_daily_indicator=db.createPartitionedTable(table(100000000:0,columns2,type2),`hushen_daily_indicator,`trade_date)
- 通过Tushare Python包,返回的是python dataframe类型数据。
- 通过http协议直接获取,返回的是Json格式数据。
- python setup.py install
- python setup.py install --force
- import datetime
- import tushare as ts
- import pandas as pd
- import numpy as np
- import dolphindb as ddb
- pro=ts.pro_api('YOUR_TOKEN')
- s=ddb.session()
- s.connect("localhost",8941,"admin","123456")
- t1=s.loadTable(tableName="hushen_daily_line",dbPath="dfs://tushare")
- t2=s.loadTable(tableName="hushen_daily_indicator",dbPath="dfs://tushare")
- def dateRange(beginDate,endDate):
- dates=[]
- dt=datetime.datetime.strptime(beginDate,"%Y%m%d")
- date=beginDate[:]
- while date <= endDate:
- dates.append(date)
- dt=dt + datetime.timedelta(1)
- date=dt.strftime("%Y%m%d")
- return dates
- for dates in dateRange('20080101','20171231'):
- df=pro.daily(trade_date=dates)
- df['trade_date']=pd.to_datetime(df['trade_date'])
- if len(df):
- t1.append(s.table(data=df))
- print(t1.rows)
- for dates in dateRange('20080101','20171231'):
- ds=pro.daily(trade_date=dates)
- ds['trade_date']=pd.to_datetime(ds['trade_date'])
- ds['volume_ratio']=np.float64(ds['volume_ratio'])
- if len(ds):
- t2.append(s.table(data=ds))
- print(t2.rows)
查看数据量:
- select count(*) from hushen_daily_line
- 5,332,932
- select count(*) from hushen_daily_indicator
- 5,333,321