DolphinDB是一个分布式时序数据库,并且内置了丰富的计算和分析功能。它可以将TB级的海量数据存储在多台物理机器上,充分利用CPU,对海量数据进行高性能分析计算。通过Orca,我们可以在python环境中使用与pandas语法相同的脚本对DolphinDB分布式数据库中的数据进行复杂高效的计算。本教程主要介绍Orca对DolphinDB分布式表的操作。
本示例使用的是DolphinDB单机模式。首先,创建本教程的示例数据库dfs://orca_stock 。创建数据库的DolphinDB脚本如下所示:
login("admin","123456")if(existsDatabase("dfs://orca_stock")){dropDatabase("dfs://orca_stock")}dates=2019.01.01..2019.01.31syms="A"+string(1..30)sym_range=cutPoints(syms,3)db1=database("",VALUE,dates)db2=database("",RANGE,sym_range)db=database("dfs://orca_stock",COMPO,[db1,db2])n=10000000datetimes=2019.01.01T00:00:00..2019.01.31T23:59:59t=table(rand(datetimes,n) as trade_time,rand(syms,n) as sym,rand(1000,n) as qty,rand(500.0,n) as price)trades=db.createPartitionedTable(t,`trades,`trade_time`sym).append!(t)n=200000datetimes=2019.01.01T00:00:00..2019.01.02T23:59:59syms="A"+string(1..30)t2=table(rand(datetimes,n) as trade_time,rand(syms,n) as sym,rand(500.0,n) as bid,rand(500.0,n) as offer)quotes=db.createPartitionedTable(t2,`quotes,`trade_time`sym).append!(t2)syms="A"+string(1..30)t3=table(syms as sym,rand(0 1,30) as type)infos=db.createTable(t3,`infos).append!(t3)
注意:需要在DolphinDB database客户端或通过DolphinDB Python API创建分布式表,不能直接在Orca创建分布式表。
在Orca中通过connect
函数连接到DolphinDB服务器:
>>> import dolphindb.orca as orca>>> orca.connect("localhost",8848,"admin","123456")
用户需要根据实际情况修改IP地址和端口号。
1 读取分布式表
Orca通过read_table
函数读取分布式表,返回的结果是Orca DataFrame。例如:读取示例数据库dfs://orca_stock 中的表trades:
>>> trades = orca.read_table('dfs://orca_stock','trades')>>> type(trades)orca.core.frame.DataFrame
查看trades的列名:
>>> trades.columnsIndex(['trade_time', 'sym', 'qty', 'price'], dtype='object')
查看trades各列的数据类型:
>>> trades.dtypestrade_time datetime64[s]sym objectqty int32price float64dtype: object
查看trades的行数:
>>> len(trades)10000000
DolphinDB分布式表对应的Orca DataFrame只存储元数据,包括表名、数据的列名等信息。由于分布式表不是连续存储,各个分区之间没有严格的顺序关系,因此分布式表对应的DataFrame没有RangeIndex的概念。如果需要设置index,可以使用set_index
函数。例如,把trades中的trade_time设置为index:
>>> trades.set_index('trade_time')
如果要将index列转换为数据列,可以用reset_index
函数。
>>> trades.reset_index()
2 查询和计算
Orca采用惰性求值,某些计算不会立即在服务端计算,而是转换为一个中间表达式,直到真正需要时才发生计算。如果用户需要立即触发计算,可以调用compute
函数。
注意,示例数据库dfs://orca_stock 中的数据是随机生成的,因此用户的运行结果会与本章中的结果有所差异。
2.1 取前n条记录
head
函数可以查询前n条记录,默认取前5条。例如,取trades的前5条记录:
>>> trades.head() trade_time sym qty price0 2019-01-01 18:04:33 A16 855 482.5267691 2019-01-01 13:57:38 A12 244 61.6752932 2019-01-01 23:58:15 A10 36 297.6232953 2019-01-01 23:02:43 A16 426 109.0410124 2019-01-01 04:33:53 A1 472 75.778951
2.2 排序
sort_values
方法可以根据某列排序。例如,trades按照price降序排序,取前5条记录:
>>> trades.sort_values(by='price', ascending=False).head() trade_time sym qty price0 2019-01-03 12:56:09 A22 861 499.9999981 2019-01-18 17:25:21 A19 95 499.9999632 2019-01-30 02:18:48 A30 114 499.9999493 2019-01-23 08:31:56 A3 926 499.9999264 2019-01-20 03:36:53 A3 719 499.999892
按照多列排序:
>>> trades.sort_values(by=['qty','trade_time'], ascending=False).head() trade_time sym qty price0 2019-01-31 23:58:50 A24 999 359.8876971 2019-01-31 23:57:26 A3 999 420.1561752 2019-01-31 23:56:34 A2 999 455.2284353 2019-01-31 23:52:58 A6 999 210.8192274 2019-01-31 23:45:17 A14 999 310.813216
2.3 按照条件查询
Orca支持按照单个或多个条件多虑查询。例如,
查询trades中2019年1月2日的数据:
>>> tmp = trades[trades.trade_time.dt.date == "2019.01.01"]>>> tmp.head() trade_time sym qty price0 2019-01-01 00:32:21 A2 139 383.9712931 2019-01-01 21:19:09 A2 263 100.9325532 2019-01-01 18:50:48 A2 890 335.6144543 2019-01-01 23:29:16 A2 858 469.2239924 2019-01-01 09:58:51 A2 883 235.753424
查询trades中2019年1月30日,股票代码为A2的数据:
>>> tmp = trades[(trades.trade_time.dt.date == '2019.01.30') & (trades.sym == 'A2')]>>> tmp.head() trade_time sym qty price0 2019-01-30 04:41:56 A2 880 428.5526541 2019-01-30 14:13:53 A2 512 488.8269782 2019-01-30 14:31:28 A2 536 478.5782193 2019-01-30 04:09:41 A2 709 255.4359034 2019-01-30 13:18:50 A2 355 404.782260
2.4 groupby分组查询
groupby
函数用于分组聚合。以下函数都可以用于groupby对象:
count
:返回非NULL元素的个数sum
:求和mean
:均值min
:最小值max
:最大值mode
:众数abs
:绝对值prod
:乘积std
:标准差var
:方差sem
:平均值的标准误差skew
:倾斜度kurtosis
:峰度cumsum
:累积求和cumprod
:累积乘积cummax
:累积最大值cummin
:累积最小值
计算trades中每天的记录数:
>>> trades.groupby(trades.trade_time.dt.date)['sym'].count()trade_time2019-01-01 3225732019-01-02 3226622019-01-03 3231162019-01-04 3224362019-01-05 3221562019-01-06 3241912019-01-07 3218792019-01-08 3233192019-01-09 3222622019-01-10 3225852019-01-11 3229862019-01-12 3228392019-01-13 3223022019-01-14 3220322019-01-15 3224092019-01-16 3218102019-01-17 3215662019-01-18 3236512019-01-19 3234632019-01-20 3226752019-01-21 3228452019-01-22 3229312019-01-23 3225982019-01-24 3224042019-01-25 3224542019-01-26 3217602019-01-27 3219552019-01-28 3220132019-01-29 3227452019-01-30 3221932019-01-31 323190dtype: int64
计算trades中每天每只股票的记录数:
>>> trades.groupby([trades.trade_time.dt.date,'sym'])['price'].count()trade_time sym2019-01-01 A1 10638 A10 10747 A11 10709 A12 10715 A13 10914 ... 2019-01-31 A5 10717 A6 10934 A7 10963 A8 10907 A9 10815Length: 930, dtype: int64
Orca支持通过agg一次应用多个聚合函数。和pandas不同,Orca在agg中使用字符串来表示要调用的聚合函数。例如,对计算trades中每天价格的最大值、最小值和均值:
>>> trades.groupby(trades.trade_time.dt.date)['price'].agg(["min","max","avg"]) price min max avgtrade_time 2019-01-01 0.003263 499.999073 249.9136122019-01-02 0.000468 499.999533 249.9568742019-01-03 0.000054 499.999998 249.9272572019-01-04 0.000252 499.999762 249.9827372019-01-05 0.001907 499.999704 250.0974872019-01-06 0.000318 499.999824 249.9916052019-01-07 0.003196 499.999548 249.5605052019-01-08 0.000216 499.996703 250.0244052019-01-09 0.002635 499.998985 249.9664462019-01-10 0.000725 499.996717 249.6633242019-01-11 0.003140 499.998267 250.2437862019-01-12 0.000105 499.998453 250.0770612019-01-13 0.004297 499.999139 250.0974892019-01-14 0.003510 499.999452 249.7758302019-01-15 0.002501 499.999638 250.0212182019-01-16 0.000451 499.998059 250.0440592019-01-17 0.002359 499.998462 249.8089322019-01-18 0.000104 499.999963 249.9186512019-01-19 0.000999 499.998000 249.8994952019-01-20 0.000489 499.999892 249.6066682019-01-21 0.000729 499.999774 249.8398762019-01-22 0.000834 499.999331 249.6320372019-01-23 0.001982 499.999926 249.9550312019-01-24 0.000323 499.993956 249.5578512019-01-25 0.000978 499.999716 249.7220532019-01-26 0.002582 499.998753 249.8975192019-01-27 0.000547 499.999809 250.4046662019-01-28 0.002729 499.998545 249.6222892019-01-29 0.000487 499.999598 249.9501672019-01-30 0.000811 499.999949 250.1824932019-01-31 0.000801 499.999292 249.317517
Orca groupby支持过滤功能。和pandas不同,Orca中的过滤条件用字符串形式的表达式来表示,而不是lambda函数。
例如,返回trades中每天每只股票均价大于200,并且记录数大于11000的记录:
>>> trades.groupby([trades.trade_time.dt.date,'sym'])['price'].filter("avg(price) > 200 and count(price) > 11000")0 499.1711791 375.5530592 119.2408903 370.1985344 5.876941 ... 88416 37.87231788417 373.25978588418 435.15448488419 436.16380688420 428.455914Length: 88421, dtype: float64
2.5 resample重采样
Orca支持resample
函数,可以对常规时间序列数据重新采样和频率转换。目前,resample函数的参数如下:
- rule:DateOffset,可以是字符串或者是dateoffset对象
- on:时间列,采用该列进行重采样
- level:字符串或整数,对于MultiIndex,采用level指定的列进行重采样
Orca支持的DateOffset如下:
'B':BDay or BusinessDay'WOM':WeekOfMonth'LWOM':LastWeekOfMonth'M':MonthEnd'MS':MonthBegin'BM':BMonthEnd or BusinessMonthEnd'BMS':BMonthBegin or BusinessMonthBegin'SM':SemiMonthEnd'SMS':SemiMonthBegin'Q':QuarterEnd'QS':QuarterBegin'BQ':BQuarterEnd'BQS':BQuarterBegin'REQ':FY5253Quarter'A':YearEnd'AS' or 'BYS':YearBegin'BA':BYearEnd'BAS':BYearBegin'RE':FY5253'D':Day'H':Hour'T' or 'min':Minute'S':Second'L' or 'ms':Milli'U' or 'us':Micro'N':Nano
例如,对trades中的数据重新采样,每3分钟计算一次:
>>> trades.resample('3T', on='trade_time')['qty'].sum()trade_time2019-01-01 00:00:00 3210632019-01-01 00:03:00 3549172019-01-01 00:06:00 3294192019-01-01 00:09:00 3408802019-01-01 00:12:00 356612 ... 2019-01-31 23:45:00 3228292019-01-31 23:48:00 3447532019-01-31 23:51:00 3309592019-01-31 23:54:00 3367122019-01-31 23:57:00 328730Length: 14880, dtype: int64
如果trades设置了trade_time为index,也可以用以下方法重新采样:
>>> trades.resample('3T', level='trade_time')['qty'].sum()
如果要用dateoffset函数生成的对象来表示dateoffset,需要先导入pandas的dateoffset。按3分钟重新采样也可以使用以下写法:
>>> from pandas.tseries.offsets import *>>> ofst = Minute(n=3)>>> trades.resample(ofst,on='trade_time')['qty'].sum()
2.6 rolling移动窗口
Orca提供了rolling
函数,可以在移动窗口中做计算。目前,rolling
函数的参数如下:
- window::整型,表示窗口的长度
- on:字符串,根据该列来计算窗口
以下函数可用于orca.DataFrame.rolling对象:
count
:返回非NULL元素的个数sum
:求和min
:最小值max
:最大值std
:标准差var
:方差corr
:相关性covar
:协方差skew
:倾斜度kurtosis
:峰度
对于分布式表对应的DataFrame,在滑动窗口中计算时,是以分区为单位单独计算的,因此每个分区的计算结果的前window-1个值为空。例如,trades中2019.01.01和2019.01.02的数据在长度为3的滑动窗口中求price的和:
>>> tmp = trades[(trades.trade_time.dt.date == '2019.01.01') | (trades.trade_time.dt.date == '2019.01.02')]>>> re = tmp.rolling(window=3)['price'].sum()0 NaN1 NaN2 792.3866033 601.8263124 444.858366 ... 646057 1281.099161646058 1287.816045646059 963.262163646060 865.797011646061 719.050068Name: price, Length: 646062, dtype: float64
2.7 数据连接
Orca提供了连接DataFrame的功能。分布式表对应的DataFrame,既可以连接普通内存表对应的DataFrame,也可以连接分布式表对应的DataFrame。两个分布式表对应的DataFrame连接时必须同时满足以下条件:
- 两个分布式表在同一个数据库中
- 连接列必须包含所有分区列
Orca提供了merge
和join
函数。
merge
函数支持以下参数:
- right:Orca DataFrame或Series
- how:字符串,表示连接的类型,可以是left、right、outer和inner,默认值是inner
- on:字符串,表示连接列
- left_on:字符串,表示左表的连接列
- right_on:字符串,表示右表的连接列
- left_index:左表的索引
- right_index:右表的索引
- suffixes:字符串,表示重复列的后缀
join
函数是merge
函数的特例,它的参数及含义与merge
基本相同,只是join
默认为左外连接,即how='left'。
例如,对trades和quotes进行内连接:
>>> quotes = orca.read_table('dfs://orca_stock','quotes')>>> trades.merge(right=quotes, left_on=['trade_time','sym'], right_on=['trade_time','sym'], how='inner') trade_time sym qty price bid offer0 2019-01-01 02:36:34 A15 273 186.144261 317.458480 155.3616611 2019-01-01 05:37:59 A13 185 420.397500 248.447426 115.7228932 2019-01-01 00:59:43 A10 751 89.801687 193.925714 144.3454733 2019-01-01 21:58:36 A16 175 251.753495 116.810807 439.1782074 2019-01-01 10:53:54 A16 532 71.733640 240.927647 388.718680... ... ... ... ... ... ...25035 2019-01-02 03:59:51 A3 220 50.004418 107.905522 167.37599425036 2019-01-02 17:54:01 A3 202 195.189216 134.463906 142.44342825037 2019-01-02 16:57:50 A9 627 68.661644 440.421876 110.80107025038 2019-01-02 10:27:43 A28 414 487.337282 169.081363 261.17107325039 2019-01-02 17:02:51 A3 661 243.960836 92.999404 26.747609[25040 rows x 6 columns]
使用join
函数对trades和quotes进行左外连接:
>>> trades.set_index(['trade_time','sym'], inplace=True)>>> quotes.set_index(['trade_time','sym'], inplace=True)>>> trades.join(quotes) qty price bid offertrade_time sym 2019-01-01 18:04:25 A14 435 378.595626 NaN NaN2019-01-01 20:38:47 A13 701 275.039372 NaN NaN2019-01-01 02:43:03 A16 787 138.751605 NaN NaN2019-01-01 20:32:42 A14 989 188.035335 NaN NaN2019-01-01 16:59:16 A13 847 118.071427 NaN NaN... ... ... ... ...2019-01-31 17:21:27 A30 3 49.855063 NaN NaN2019-01-31 13:49:01 A6 273 245.966115 NaN NaN2019-01-31 16:42:29 A7 548 197.814548 NaN NaN2019-01-31 03:42:11 A5 563 263.999224 NaN NaN2019-01-31 20:48:57 A9 809 318.420522 NaN NaN[10000481 rows x 4 columns]
3 把dataframe追加到dfs表
Orca提供了append
函数,可以将Orca DataFrame追加到dfs表中。
append
函数具有以下参数:
- other:要追加的DataFrame
- ignore_index:布尔值,是否忽略索引。默认为False
- verify_integrity:布尔值。默认为False
- sort:布尔值,表示是否排序。默认为None
- inplace:布尔值,表示是否插入到dfs表。默认为False
例如,往dataframe追加到trades对应的分布式表:
>>> import pandas as pd>>> odf=orca.DataFrame({'trade_time':pd.date_range('20190101 12:30',periods=5,freq='T'), 'sym':['A1','A2','A3','A4','A5'], 'qty':[100,200,300,400,500], 'price':[100.5,263.1,254.9,215.1,245.6]})>>> trades.append(odf,inplace=True)>>> len(trades)10000005
Orca扩展了append函数,支持inplace参数,即允许就地添加数据。如果inplace为False,表现和pandas相同。分布式表中的内容会复制到内存中,此时trades对应的只是一个内存表,odf中的内容只追加到内存表,没有真正地追加到dfs表。
4 小结
对于分布式表,目前Orca还具有一些功能上的限制,例如分区表对应的DataFrame没有RangeIndex的概念、一些函数不支持在分布式表上使用以及修改表中数据的限制等。具体请参考Orca快速入门指导。