伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 教育考试 » 文章详细 订阅RssFeed

干货丨解析Orca对DolphinDB分布式表的操作

来源:本站原创 浏览:90次 时间:2022-12-31

DolphinDB是一个分布式时序数据库,并且内置了丰富的计算和分析功能。它可以将TB级的海量数据存储在多台物理机器上,充分利用CPU,对海量数据进行高性能分析计算。通过Orca,我们可以在python环境中使用与pandas语法相同的脚本对DolphinDB分布式数据库中的数据进行复杂高效的计算。本教程主要介绍Orca对DolphinDB分布式表的操作。

本示例使用的是DolphinDB单机模式。首先,创建本教程的示例数据库dfs://orca_stock 。创建数据库的DolphinDB脚本如下所示:

login("admin","123456")if(existsDatabase("dfs://orca_stock")){dropDatabase("dfs://orca_stock")}dates=2019.01.01..2019.01.31syms="A"+string(1..30)sym_range=cutPoints(syms,3)db1=database("",VALUE,dates)db2=database("",RANGE,sym_range)db=database("dfs://orca_stock",COMPO,[db1,db2])n=10000000datetimes=2019.01.01T00:00:00..2019.01.31T23:59:59t=table(rand(datetimes,n) as trade_time,rand(syms,n) as sym,rand(1000,n) as qty,rand(500.0,n) as price)trades=db.createPartitionedTable(t,`trades,`trade_time`sym).append!(t)n=200000datetimes=2019.01.01T00:00:00..2019.01.02T23:59:59syms="A"+string(1..30)t2=table(rand(datetimes,n) as trade_time,rand(syms,n) as sym,rand(500.0,n) as bid,rand(500.0,n) as offer)quotes=db.createPartitionedTable(t2,`quotes,`trade_time`sym).append!(t2)syms="A"+string(1..30)t3=table(syms as sym,rand(0 1,30) as type)infos=db.createTable(t3,`infos).append!(t3)
注意:需要在DolphinDB database客户端或通过DolphinDB Python API创建分布式表,不能直接在Orca创建分布式表。

在Orca中通过connect函数连接到DolphinDB服务器:

>>> import dolphindb.orca as orca>>> orca.connect("localhost",8848,"admin","123456")

用户需要根据实际情况修改IP地址和端口号。

1 读取分布式表

Orca通过read_table函数读取分布式表,返回的结果是Orca DataFrame。例如:读取示例数据库dfs://orca_stock 中的表trades:

>>> trades = orca.read_table('dfs://orca_stock','trades')>>> type(trades)orca.core.frame.DataFrame

查看trades的列名:

>>> trades.columnsIndex(['trade_time', 'sym', 'qty', 'price'], dtype='object')

查看trades各列的数据类型:

>>> trades.dtypestrade_time    datetime64[s]sym                  objectqty                   int32price               float64dtype: object

查看trades的行数:

>>> len(trades)10000000

DolphinDB分布式表对应的Orca DataFrame只存储元数据,包括表名、数据的列名等信息。由于分布式表不是连续存储,各个分区之间没有严格的顺序关系,因此分布式表对应的DataFrame没有RangeIndex的概念。如果需要设置index,可以使用set_index函数。例如,把trades中的trade_time设置为index:

>>> trades.set_index('trade_time')

如果要将index列转换为数据列,可以用reset_index函数。

>>> trades.reset_index()

2 查询和计算

Orca采用惰性求值,某些计算不会立即在服务端计算,而是转换为一个中间表达式,直到真正需要时才发生计算。如果用户需要立即触发计算,可以调用compute函数。

注意,示例数据库dfs://orca_stock 中的数据是随机生成的,因此用户的运行结果会与本章中的结果有所差异。

2.1 取前n条记录

head函数可以查询前n条记录,默认取前5条。例如,取trades的前5条记录:

>>> trades.head()           trade_time  sym  qty       price0 2019-01-01 18:04:33  A16  855  482.5267691 2019-01-01 13:57:38  A12  244   61.6752932 2019-01-01 23:58:15  A10   36  297.6232953 2019-01-01 23:02:43  A16  426  109.0410124 2019-01-01 04:33:53   A1  472   75.778951

2.2 排序

sort_values方法可以根据某列排序。例如,trades按照price降序排序,取前5条记录:

>>> trades.sort_values(by='price', ascending=False).head()           trade_time  sym  qty       price0 2019-01-03 12:56:09  A22  861  499.9999981 2019-01-18 17:25:21  A19   95  499.9999632 2019-01-30 02:18:48  A30  114  499.9999493 2019-01-23 08:31:56   A3  926  499.9999264 2019-01-20 03:36:53   A3  719  499.999892

按照多列排序:

>>> trades.sort_values(by=['qty','trade_time'], ascending=False).head()           trade_time  sym  qty       price0 2019-01-31 23:58:50  A24  999  359.8876971 2019-01-31 23:57:26   A3  999  420.1561752 2019-01-31 23:56:34   A2  999  455.2284353 2019-01-31 23:52:58   A6  999  210.8192274 2019-01-31 23:45:17  A14  999  310.813216

2.3 按照条件查询

Orca支持按照单个或多个条件多虑查询。例如,

查询trades中2019年1月2日的数据:

>>> tmp = trades[trades.trade_time.dt.date == "2019.01.01"]>>> tmp.head()           trade_time sym  qty       price0 2019-01-01 00:32:21  A2  139  383.9712931 2019-01-01 21:19:09  A2  263  100.9325532 2019-01-01 18:50:48  A2  890  335.6144543 2019-01-01 23:29:16  A2  858  469.2239924 2019-01-01 09:58:51  A2  883  235.753424

查询trades中2019年1月30日,股票代码为A2的数据:

>>> tmp = trades[(trades.trade_time.dt.date == '2019.01.30') & (trades.sym == 'A2')]>>> tmp.head()           trade_time sym  qty       price0 2019-01-30 04:41:56  A2  880  428.5526541 2019-01-30 14:13:53  A2  512  488.8269782 2019-01-30 14:31:28  A2  536  478.5782193 2019-01-30 04:09:41  A2  709  255.4359034 2019-01-30 13:18:50  A2  355  404.782260

2.4 groupby分组查询

groupby函数用于分组聚合。以下函数都可以用于groupby对象:

  • count:返回非NULL元素的个数
  • sum:求和
  • mean:均值
  • min:最小值
  • max:最大值
  • mode:众数
  • abs:绝对值
  • prod:乘积
  • std:标准差
  • var:方差
  • sem:平均值的标准误差
  • skew:倾斜度
  • kurtosis:峰度
  • cumsum:累积求和
  • cumprod:累积乘积
  • cummax:累积最大值
  • cummin:累积最小值

计算trades中每天的记录数:

>>> trades.groupby(trades.trade_time.dt.date)['sym'].count()trade_time2019-01-01    3225732019-01-02    3226622019-01-03    3231162019-01-04    3224362019-01-05    3221562019-01-06    3241912019-01-07    3218792019-01-08    3233192019-01-09    3222622019-01-10    3225852019-01-11    3229862019-01-12    3228392019-01-13    3223022019-01-14    3220322019-01-15    3224092019-01-16    3218102019-01-17    3215662019-01-18    3236512019-01-19    3234632019-01-20    3226752019-01-21    3228452019-01-22    3229312019-01-23    3225982019-01-24    3224042019-01-25    3224542019-01-26    3217602019-01-27    3219552019-01-28    3220132019-01-29    3227452019-01-30    3221932019-01-31    323190dtype: int64

计算trades中每天每只股票的记录数:

>>> trades.groupby([trades.trade_time.dt.date,'sym'])['price'].count()trade_time  sym2019-01-01  A1     10638            A10    10747            A11    10709            A12    10715            A13    10914                   ...  2019-01-31  A5     10717            A6     10934            A7     10963            A8     10907            A9     10815Length: 930, dtype: int64

Orca支持通过agg一次应用多个聚合函数。和pandas不同,Orca在agg中使用字符串来表示要调用的聚合函数。例如,对计算trades中每天价格的最大值、最小值和均值:

>>> trades.groupby(trades.trade_time.dt.date)['price'].agg(["min","max","avg"])               price                                         min         max         avgtrade_time                                  2019-01-01  0.003263  499.999073  249.9136122019-01-02  0.000468  499.999533  249.9568742019-01-03  0.000054  499.999998  249.9272572019-01-04  0.000252  499.999762  249.9827372019-01-05  0.001907  499.999704  250.0974872019-01-06  0.000318  499.999824  249.9916052019-01-07  0.003196  499.999548  249.5605052019-01-08  0.000216  499.996703  250.0244052019-01-09  0.002635  499.998985  249.9664462019-01-10  0.000725  499.996717  249.6633242019-01-11  0.003140  499.998267  250.2437862019-01-12  0.000105  499.998453  250.0770612019-01-13  0.004297  499.999139  250.0974892019-01-14  0.003510  499.999452  249.7758302019-01-15  0.002501  499.999638  250.0212182019-01-16  0.000451  499.998059  250.0440592019-01-17  0.002359  499.998462  249.8089322019-01-18  0.000104  499.999963  249.9186512019-01-19  0.000999  499.998000  249.8994952019-01-20  0.000489  499.999892  249.6066682019-01-21  0.000729  499.999774  249.8398762019-01-22  0.000834  499.999331  249.6320372019-01-23  0.001982  499.999926  249.9550312019-01-24  0.000323  499.993956  249.5578512019-01-25  0.000978  499.999716  249.7220532019-01-26  0.002582  499.998753  249.8975192019-01-27  0.000547  499.999809  250.4046662019-01-28  0.002729  499.998545  249.6222892019-01-29  0.000487  499.999598  249.9501672019-01-30  0.000811  499.999949  250.1824932019-01-31  0.000801  499.999292  249.317517

Orca groupby支持过滤功能。和pandas不同,Orca中的过滤条件用字符串形式的表达式来表示,而不是lambda函数。

例如,返回trades中每天每只股票均价大于200,并且记录数大于11000的记录:

>>> trades.groupby([trades.trade_time.dt.date,'sym'])['price'].filter("avg(price) > 200 and count(price) > 11000")0        499.1711791        375.5530592        119.2408903        370.1985344          5.876941            ...    88416     37.87231788417    373.25978588418    435.15448488419    436.16380688420    428.455914Length: 88421, dtype: float64

2.5 resample重采样

Orca支持resample函数,可以对常规时间序列数据重新采样和频率转换。目前,resample函数的参数如下:

  • rule:DateOffset,可以是字符串或者是dateoffset对象
  • on:时间列,采用该列进行重采样
  • level:字符串或整数,对于MultiIndex,采用level指定的列进行重采样

Orca支持的DateOffset如下:

'B':BDay or BusinessDay'WOM':WeekOfMonth'LWOM':LastWeekOfMonth'M':MonthEnd'MS':MonthBegin'BM':BMonthEnd or BusinessMonthEnd'BMS':BMonthBegin or BusinessMonthBegin'SM':SemiMonthEnd'SMS':SemiMonthBegin'Q':QuarterEnd'QS':QuarterBegin'BQ':BQuarterEnd'BQS':BQuarterBegin'REQ':FY5253Quarter'A':YearEnd'AS' or 'BYS':YearBegin'BA':BYearEnd'BAS':BYearBegin'RE':FY5253'D':Day'H':Hour'T' or 'min':Minute'S':Second'L' or 'ms':Milli'U' or 'us':Micro'N':Nano

例如,对trades中的数据重新采样,每3分钟计算一次:

>>> trades.resample('3T', on='trade_time')['qty'].sum()trade_time2019-01-01 00:00:00    3210632019-01-01 00:03:00    3549172019-01-01 00:06:00    3294192019-01-01 00:09:00    3408802019-01-01 00:12:00    356612                        ...  2019-01-31 23:45:00    3228292019-01-31 23:48:00    3447532019-01-31 23:51:00    3309592019-01-31 23:54:00    3367122019-01-31 23:57:00    328730Length: 14880, dtype: int64

如果trades设置了trade_time为index,也可以用以下方法重新采样:

>>> trades.resample('3T', level='trade_time')['qty'].sum()

如果要用dateoffset函数生成的对象来表示dateoffset,需要先导入pandas的dateoffset。按3分钟重新采样也可以使用以下写法:

>>> from pandas.tseries.offsets import *>>> ofst = Minute(n=3)>>> trades.resample(ofst,on='trade_time')['qty'].sum()

2.6 rolling移动窗口

Orca提供了rolling函数,可以在移动窗口中做计算。目前,rolling函数的参数如下:

  • window::整型,表示窗口的长度
  • on:字符串,根据该列来计算窗口

以下函数可用于orca.DataFrame.rolling对象:

  • count:返回非NULL元素的个数
  • sum:求和
  • min:最小值
  • max:最大值
  • std:标准差
  • var:方差
  • corr:相关性
  • covar:协方差
  • skew:倾斜度
  • kurtosis:峰度

对于分布式表对应的DataFrame,在滑动窗口中计算时,是以分区为单位单独计算的,因此每个分区的计算结果的前window-1个值为空。例如,trades中2019.01.01和2019.01.02的数据在长度为3的滑动窗口中求price的和:

>>> tmp = trades[(trades.trade_time.dt.date == '2019.01.01') | (trades.trade_time.dt.date == '2019.01.02')]>>> re = tmp.rolling(window=3)['price'].sum()0                 NaN1                 NaN2          792.3866033          601.8263124          444.858366             ...     646057    1281.099161646058    1287.816045646059     963.262163646060     865.797011646061     719.050068Name: price, Length: 646062, dtype: float64

2.7 数据连接

Orca提供了连接DataFrame的功能。分布式表对应的DataFrame,既可以连接普通内存表对应的DataFrame,也可以连接分布式表对应的DataFrame。两个分布式表对应的DataFrame连接时必须同时满足以下条件:

  • 两个分布式表在同一个数据库中
  • 连接列必须包含所有分区列

Orca提供了mergejoin函数。

merge函数支持以下参数:

  • right:Orca DataFrame或Series
  • how:字符串,表示连接的类型,可以是left、right、outer和inner,默认值是inner
  • on:字符串,表示连接列
  • left_on:字符串,表示左表的连接列
  • right_on:字符串,表示右表的连接列
  • left_index:左表的索引
  • right_index:右表的索引
  • suffixes:字符串,表示重复列的后缀

join函数是merge函数的特例,它的参数及含义与merge基本相同,只是join默认为左外连接,即how='left'。

例如,对trades和quotes进行内连接:

>>> quotes = orca.read_table('dfs://orca_stock','quotes')>>> trades.merge(right=quotes, left_on=['trade_time','sym'], right_on=['trade_time','sym'], how='inner')               trade_time  sym  qty       price         bid       offer0     2019-01-01 02:36:34  A15  273  186.144261  317.458480  155.3616611     2019-01-01 05:37:59  A13  185  420.397500  248.447426  115.7228932     2019-01-01 00:59:43  A10  751   89.801687  193.925714  144.3454733     2019-01-01 21:58:36  A16  175  251.753495  116.810807  439.1782074     2019-01-01 10:53:54  A16  532   71.733640  240.927647  388.718680...                   ...  ...  ...         ...         ...         ...25035 2019-01-02 03:59:51   A3  220   50.004418  107.905522  167.37599425036 2019-01-02 17:54:01   A3  202  195.189216  134.463906  142.44342825037 2019-01-02 16:57:50   A9  627   68.661644  440.421876  110.80107025038 2019-01-02 10:27:43  A28  414  487.337282  169.081363  261.17107325039 2019-01-02 17:02:51   A3  661  243.960836   92.999404   26.747609[25040 rows x 6 columns]

使用join函数对trades和quotes进行左外连接:

>>> trades.set_index(['trade_time','sym'], inplace=True)>>> quotes.set_index(['trade_time','sym'], inplace=True)>>> trades.join(quotes)                         qty       price  bid  offertrade_time          sym                             2019-01-01 18:04:25 A14  435  378.595626  NaN    NaN2019-01-01 20:38:47 A13  701  275.039372  NaN    NaN2019-01-01 02:43:03 A16  787  138.751605  NaN    NaN2019-01-01 20:32:42 A14  989  188.035335  NaN    NaN2019-01-01 16:59:16 A13  847  118.071427  NaN    NaN...                      ...         ...  ...    ...2019-01-31 17:21:27 A30    3   49.855063  NaN    NaN2019-01-31 13:49:01 A6   273  245.966115  NaN    NaN2019-01-31 16:42:29 A7   548  197.814548  NaN    NaN2019-01-31 03:42:11 A5   563  263.999224  NaN    NaN2019-01-31 20:48:57 A9   809  318.420522  NaN    NaN[10000481 rows x 4 columns]

3 把dataframe追加到dfs表

Orca提供了append函数,可以将Orca DataFrame追加到dfs表中。

append函数具有以下参数:

  • other:要追加的DataFrame
  • ignore_index:布尔值,是否忽略索引。默认为False
  • verify_integrity:布尔值。默认为False
  • sort:布尔值,表示是否排序。默认为None
  • inplace:布尔值,表示是否插入到dfs表。默认为False

例如,往dataframe追加到trades对应的分布式表:

>>> import pandas as pd>>> odf=orca.DataFrame({'trade_time':pd.date_range('20190101 12:30',periods=5,freq='T'),                   'sym':['A1','A2','A3','A4','A5'],                   'qty':[100,200,300,400,500],                   'price':[100.5,263.1,254.9,215.1,245.6]})>>> trades.append(odf,inplace=True)>>> len(trades)10000005

Orca扩展了append函数,支持inplace参数,即允许就地添加数据。如果inplace为False,表现和pandas相同。分布式表中的内容会复制到内存中,此时trades对应的只是一个内存表,odf中的内容只追加到内存表,没有真正地追加到dfs表。

4 小结

对于分布式表,目前Orca还具有一些功能上的限制,例如分区表对应的DataFrame没有RangeIndex的概念、一些函数不支持在分布式表上使用以及修改表中数据的限制等。具体请参考Orca快速入门指导。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net