DolphinDB可以在Python中调用,大大降低了时序数据库的使用门槛。
DolphinDB Python API实质是封装了DolphinDB的脚本语言1(也就是前面11次案例中使用到的语言)。Python代码被转换成DolphinDB脚本在DolphinDB服务器执行,执行结果保存到DolphinDB服务器或者序列化到Python客户端。
只是将Python语言转换成DolphinDB脚本,但并不在DolphinDB服务器上执行。
即将Python语言转成DolphinDB脚本语言,又在DolphinDB服务器上运行。
触发脚本执行的方法,都是table类的方法。如下:
方法名详情connect(host, port, [username, password])将会话连接到DolphinDB服务器。toDF()把DolphinDB表对象转换成pandas的Dataframe对象。executeAs(tableName)执行结果保存为指定表名的内存表。execute()执行脚本。与update和delete一起使用。database(dbPath, …)创建或加载数据库。dropDatabase(dbPath)删除数据库。dropPartition(dbPath, partitionPaths)删除数据库的某个分区。dropTable(dbPath, tableName)删除数据库中的表。drop(colNameList)删除表中的某列。ols(Y, X, intercept)计算普通最小二乘回归,返回结果是一个字典。Python通过会话与DolphinDB进行交互,就像MySQL与Python的engine一样。
建立会话之前,需要把目标服务器先打开,即服务器的端口呈开放状态。
# 创建会话,建立连接 s = ddb.session() s.connect("localhost", 8920) # # 如需用户名和密码 # s.connect("localhost", 8848, admin, 123456)如果连接成功,运行后不会返回信息。想来是遵循无消息就是最好的消息原则。
如果,连接失败,会返回错误码。
如果采用单节点部署,默认端口8848;如果采用单服务器集群部署,默认端口8920.
在API使用期间,如果与服务器连接暂时中断,API会进行重新连接,并执行之前未成功运行的脚本。
重新连接会获得一个新的会话,在新会话执行之前连接未成功运行的脚本B之前,可以通过下述代码设置一个任务A,先执行完A再执行未成功运行的B。
import dolphindb as ddb s = ddb.session() s.setInitScript("initTable = streamTable(10000:0, `id`val, [INT,LONG])") currentInitScript = s.getInitScript()DolphinDB数据库根据存储方式可以分成3种类型:
内存数据库本地文件系统数据库分布式文件系统(DFS)的数据库关于三种类型的基本概念,参见《内存数据库、磁盘数据库、分布式数据库区别》《数据库分区、分表、分库、分片》。
本文先以本地文件系统的数据库为例。
此处使用example.csv数据,此数据官网有提供,不过下载网速较慢,各位老板如有需要,可以留言,我发给各位。
loadText方法把文本文件载入到DolphinDB的内存表种,并返回一个DolphinDB内存表对象。
>> trade = s.loadText('D:/DolphinDB/Python/example.csv') # Windows中路径需要用反斜杠,与常规Python应用不同 >> print(trade) >> print(type(trade)) <dolphindb.table.Table object at 0x000001CED8779BA8> <class 'dolphindb.table.Table'>当然,这里也可以使用ploadText,速度会翻倍。
对比载入一个840M股票数据csv文件:
方式时间pd.read_csv01:02.877loadText00:33.288ploadText00:16.969详见《DolphinDB与pandas读取csv文件速度对比测试》
toDF方法把Python中的DolphinDB对象转换成pandas DataFrame对象。
>> df = trade.toDF() >> print(df) >> print(type(df)) TICKER date VOL PRC BID ASK 0 AMZN 1997-05-15 6029815 23.50000 23.50000 23.6250 1 AMZN 1997-05-16 1232226 20.75000 20.50000 21.0000 2 AMZN 1997-05-19 512070 20.50000 20.50000 20.6250 ... 13133 NFLX 2016-12-28 4388956 125.89000 125.88000 125.8900 13134 NFLX 2016-12-29 3444729 125.33000 125.31000 125.3300 13135 NFLX 2016-12-30 4455012 123.80000 123.80000 123.8300 [13136 rows x 6 columns] <class 'pandas.core.frame.DataFrame'>如果数据文件比可用内存大,可以把数据导入到分区数据库中。
分区方案:值分区;
分区字段:example.csv文件中有3只股票,使用股票代码作为分区字段。
s.database('db', partitionType= 'VALUE', partitions=["AMZN", "NFLX", "NVDA"], dbPath='D:/DolphinDB/Python/valuedb1') # 等同于下面这种分区方案设定方法 s.database('db', partitionType= ddb.settings.VALUE, partitions=["AMZN", "NFLX", "NVDA"], dbPath='D:/DolphinDB/Python/valuedb1')要点:文件路径要用/,否则数据库创建不成功,但是并不会报错
创建分布式分区数据库
s.database('db', partitionType='VALUE', partitions=["AMZN","NFLX", "NVDA"], dbPath="dfs://valuedbdfs")与分区数据库相比,只是存储路径dbPath不同,这里valuedbdfs在数据节点文件夹下面。
其他分区方式
除了值分区(VALUE),DolphinDB还支持顺序分区(SEQ)、哈希分区(HASH)、范围分区(RANGE)、列表分区(LIST)与组合分区(COMBO)。
概念及区别详情参见《DolphinDB使用案例2:数据表分区》
创建数据库后,可使用函数loadTextEx把文本文件导入到分区数据库的分区表中。
如果分区表不存在,函数会自动生成该分区表,并把数据追加到表中;
如果分区表已存在,则直接把数据追加到分区表中。
loadTextEx函数返回一个包含载入元素的DolphinDB表对象。可通过toDF()函数转成pandas.DataFrame,前文有讲。
# 创建分区数据库 s.database('db', partitionType= ddb.settings.VALUE, partitions=["AMZN", "NFLX", "NVDA"], dbPath='D:/DolphinDB/Python/valuedb') # 创建分区表,并载入数据 trade = s.loadTextEx('db', tableName= 'trade', partitionColumns= ['TICKER'], filePath= 'D:/DolphinDB/Python/example.csv') # 查看表行数 print(trade.rows) # 查看表列数 print(trade.cols) # 查看表结构 print(trade.schema) # 查看数据库中已经存在的表 table = s.table(dbPath='D:/DolphinDB/Python/valuedb', data= "trade")上面这段代码,每运行一次,就会发现行数(trade.rows)增多,因为如果表在分布式数据库中存在,就会把数据追加到表上。
要解决这个问题,存在两个方案:
可以先判断是否存在重名数据库,如果存在就删除,重新创建。
if s.existsDatabase(WORK_DIR+"/valuedb"): s.dropDatabase(WORK_DIR+"/valuedb")这种方式在数据库不重要的时候没问题,如果数据库是需要长期存在的,这样就太粗鲁了。
创建含主键的流数据表。keyedStreamTable函数。
需要理清逻辑关系,keyedStreamTable函数是直接创建一个包含主键的表,并不能给一个已经创建好的普通表指定主键。
这就要求使用者在使用之处就做好使用规划。
先有数据库,再有表,这个逻辑是不变的。
# 第一步:创建值分区内存数据库 s.database('db', partitionType= ddb.settings.VALUE, partitions=['AMZN', 'NFLX', 'NVDA'], dbPath='') # 第二步:讲数据导入内存的分区表中 trade = s.loadTextEx(dbPath='db', partitionColumns= ['TICKER'], tableName='trade', filePath='D:/DolphinDB/Python/example.csv')返回一个DolphinDB表
ploadText函数可以并行加载文本文件到内存分区表中。
trade = s.ploadText('D:/DolphinDB/Python/example.csv')ploadText函数直接载入内存表,并不需要提前创建内存数据库。
loadText函数也一样,只是比ploadText慢一点。
上传数据时,Python中的一些基础类型,如bool,int64,float64,会自动转换为DolphinDB的BOOL,INT,DOUBLE类型。
时间类型需要做特殊处理。
DolphinDB提供DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP九种类型的时间类型。
Python中时间类型均为datetime64类型,会被转换成DolphinDB的NANOTIMESTAMP类型。
Python API提供了from_time,from_date,from_datetime方法,能把datetime64类型转换成DolphinDB的各种时间类型。具体对应参见:
DolphinDB时间类型例子上传到DolphinDB的结果DATEDate.from_date(date(2012,12,20))2012.12.20MONTHMonth.from_date(date(2012,12,26))2012.12MTIMETime.from_time(time(12,30,30,8))12:30:30.008MINUTEMinute.from_time(time(12,30))12:30mSECONDSecond.from_time(time(12,30,30))12:30:30DATETIMEDatetime.from_datetime(datetime(2012,12,30,15,12,30))2012.12.30 15:12:30TIMESTAMPTimestamp.from_datetime(datetime(2012,12,30,15,12,30,8))2012.12.30 15:12:30.008NANOTIMENanoTime.from_time(time(13,30,10,706))13:30:10.000706000NANOTIMESTAMPNanoTimestamp.from_datetime(datetime(2012,12,24,13,30,10,80706))2012.12.24 13:30:10.080706000Python中的np.NaN是特殊的float数据类型,上传时,DolphinDB会把他们识别为float。
此函数,可在上传DataFrame之前指定一个或多个列在dolphinDB中的数据类型。
dataframe:Python中的DataFrame类型;
dict:Python中字典对象,key表示dataframe中某列的名称;value表示DolphinDB的数据类型,只能取:ddb.DT_BOOL、ddb.DT_INT、ddb.DT_LONG、ddb.DT_DOUBLE。
ddb.overwriteTypes(t,{'isBuyer':ddb.DT_BOOL})上传字典或DataFrame时,同一列中不能同时包含Python的原生类型和DolphinDB Python API提供的类型。
例如:‘date’:[date(2012,12,30), Date.from_date(date(2012,12,31)), Date.null()], date列同时包含Python的datetime64类型和DolphinDB Python API提供的DATE类型,会导致上传失败。
upload可以把Python对象上传到DolphinDB服务器。upload函数的输入是Python的字典对象:
{'DolphinDB中的变量名' : 'Python对象'}
Python中像a=[1,2,3.0]这种类型的内置list,上传到DolphinDB后,会被识别为any vector。这种情况下,建议使用np.array代替内置list,即a=np.array([1,2,3.0],dtype=np.double),这样a会被识别为double类型的向量。
table函数用于在Python中创建DolphinDB表对象。
table函数的输入可以是字典、DataFrame、DolphinDB中的表名。
# 第一步:创建table表 dt = s.table(data={'id':[1, 2, 2, 3], 'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'], 'price': [22, 3.5, 21, 26]}).executeAs("test") # 方式一 dt = s.table(data={'id':[1, 2, 2, 3], 'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'], 'price': [22, 3.5, 21, 26]}, tableAliasName="testDict") # 方式二 # 第二步:加载table表 result = s.loadTable("test")实质是table用data数据在s中创建了一个表,以这种方式实现了数据上传。
有时需要使用Python API来向DolphinDB服务器的分区表中追加数据。
# 第一步:在DolphinDB服务器端创建数据表(这段代码在DolphinDB客户端执行) if(existsDatabase("dfs://testPython")){ dropDatabase("dfs://testPython") } db = database("dfs://testPython", VALUE, 1..100) t1 = table(10000:0,`id`cbool`cchar`cshort`cint`clong`cdate`cmonth`ctime`cminute`csecond`cdatetime`ctimestamp`cnanotime`cnanotimestamp`cfloat`cdouble`csymbol`cstring,[INT,BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,SYMBOL,STRING]) insert into t1 values (0,true,'a',122h,21,22l,2012.06.12,2012.06M,13:10:10.008,13:30m,13:30:10,2012.06.13 13:30:10,2012.06.13 13:30:10.008,13:30:10.008007006,2012.06.13 13:30:10.008007006,2.1f,2.1,"","") db.createPartitionedTable(t1, `t1, `id)<<未完待续<<
loadTable函数从数据库中加载表。
参数tableName:表示分区表的名称;参数dbPath:表示数据库的路径,如果不指定,则加载到内存中;参数memoryMode:对于分区表, true且未指定partition参数,把表中的所有数据加载到内存的分区表中;ture且指定partition参数,则之家在指定的分区数据到内存的分区表中;false,只把元数据加载到内存。 # 示例一:加载整个表的数据 trade = s.loadTable(tableName="trade", dbPath= "D:/DolphinDB/Python/valuedb") # 示例二:加载指定分区的数据 trade = s.loadTable(tableName= "trade", dbPath= "D:/DolphinDB/Python/valuedb", partitions= "AMZN") # 选择一个分区 trade = s.loadTable(tableName= "trade", dbPath= "D:/DolphinDB/Python/valuedb", partitions= ["AMZN","NVDA"]) # 选择两个分区loadTableBySQL函数把磁盘上的分区表中满足SQL语句过滤条件的数据加载到内存的分区表中。
# 去除重名数据库 if s.existsDatabase("D:/DolphinDB/Python/valuedb" or os.path.exists("D:/DolphinDB/Python/valuedb")): s.dropDatabase("D:/DolphinDB/Python/valuedb") # 新建数据库 s.database(dbName= 'db', partitionType= keys.VALUE, partitions= ["AMZN", "NFLX", "NVDA"], dbPath= "D:/DolphinDB/Python/valuedb") # 将数据载入数据库 t = s.loadTextEx("db", tableName= 'trade', partitionColumns= ['TICKER'], filePath= "D:/DolphinDB/Python/example.csv") # 查询符合SQL条件的数据,载入内存 trade = s.loadTableBySQL(tableName= "trade", dbPath= "D:/DolphinDB/Python/valuedb", sql= "select * from trade where date>2010.01.01") # 查看trade表的行数 dsc(trade.rows)从DolphinDB下载数据到python时的数据转换
DolphinDB Python API使用Python原生的各种形式的数据兑现过来存放DolphinDB服务端返回的数据,下面给出DolphinDB的数据对象到Python的数据对象的映射关系:
DolphinDBPythonDolphinDB生成数据Python数据scalarNumbers, Strings, NumPy.datetime64见6.3.2小节见6.3.2小节vectorNumPy.array1…3[1 2 3]pairLists1:5[1, 5]matrixLists1…6$2:3[array([[1, 3, 5],[2, 4, 6]], dtype=int32), None, None]setSetsset(3 5 4 6){3, 4, 5, 6}dictionaryDictionariesdict([‘IBM’,‘MS’,‘ORCL’], 170.5 56.2 49.5){‘MS’: 56.2, ‘IBM’: 170.5, ‘ORCL’: 49.5}tablepandas.DataFame见第6.1小节见第6.1小节DolphinDB 表通过toDF()函数转成Python数据,对应转换关系为:
DolphinDB类型Python类型DolphinDB数据Python数据BOOLbool[true,00b][True, nan]CHARint64[12c,00c][12, nan]SHORTint64[12,00h][12, nan]INTint64[12,00i][12, nan]LONGint64[12l,00l][12, nan]DOUBLEfloat64[3.5,00F][3.5,nan]FLOATfloat64[3.5,00f][3.5, nan]SYMBOLobjectsymbol([“AAPL”,NULL])[“AAPL”,""]STRINGobject[“AAPL”,string()][“AAPL”, “”]DATEdatetime64[2012.6.12,date()][2012-06-12, NaT]MONTHdatetime64[2012.06M, month()][2012-06-01, NaT]TIMEdatetime64[13:10:10.008,time()][1970-01-01 13:10:10.008, NaT]MINUTEdatetime64[13:30,minute()][1970-01-01 13:30:00, NaT]SECONDdatetime64[13:30:10,second()][1970-01-01 13:30:10, NaT]DATETIMEdatetime64[2012.06.13 13:30:10,datetime()][2012-06-13 13:30:10,NaT]TIMESTAMPdatetime64[2012.06.13 13:30:10.008,timestamp()][2012-06-13 13:30:10.008,NaT]NANOTIMEdatetime64[13:30:10.008007006, nanotime()][1970-01-01 13:30:10.008007006,NaT]NANOTIMESTAMPdatetime64[2012.06.13 13:30:10.008007006,nanotimestamp()][2012-06-13 13:30:10.008007006,NaT]其中:
DolphinDB CHAR类型会被转换成Python int64类型。对此结果,用户可以使用Python的chr函数使之转换为字符。由于Python pandas中所有有关时间的数据类型均为datetime64,DolphinDB中的所有时间类型数据均会被转换为datetime64类型。MONTH类型,如2012.06M,会被转换为2012-06-01(即月份当月的第一天)。TIME, MINUTE, SECOND与NANOTIME类型不包含日期信息,转换时会自动添加1970-01-01,例如13:30m会被转换为1970-01-01 13:30:00。缺失值处理:DolphinDB中的逻辑型、数值型和时序类型的NULL值默认情况下是NaN、NaT,字符串的NULL值为空字符串。DolphinDB的内存表并不提供数据类型自动转换的功能,因此在向内存表追加数据时,需要在服务端显式地调用时间转换函数对事件类型的列进行转换,首先要确保插入的数据类型与内存表schema中的数据类型保持一致。
# 使用insert into语句一次性插入多条数据 rowNum = 5 ids = np.arange(1, rowNum+1, 1, dtype=np.int32) dates = np.array(pd.date_range('4/1/2019', periods=rowNum), dtype='datetime64[D]') tickers = np.repeat("AA", rowNum) prices = np.arange(1, 0.6*(rowNum+1), 0.6, dtype=np.float64) s.upload({'ids':ids, "dates":dates, "tickers":tickers, "prices":prices}) script = "insert into tglobal values(ids,dates,tickers,prices);" s.run(script)其中,date_range()函数的dtype参数为datetime64[D],生成了只含有日期的时间列,与DolphinDB中的date类型保持一致,因此可以直接通过insert插入,不需要转换。
如果这里的时间数据类型式datetime64,则需要如下转换才能追加到内存表:
script = "insert into tglobal value(ids, date(dates), tickers, prices);" s.run(script)如果Python程序获取的数据可以组织成list形式,且保证数据类型正确的前提下,可以通过tableInsert函数来批量保存多条数据。
此方式优点在于在一次访问服务器请求中将上传数据对象和追加数据这两个步骤一次性完成,相比于insert into减少了一次访问DolphinDB服务器的请求。
# tableInsert批量追加多条数据 args = [ids, dates, tickers, prices] s.run("tableInsert{tglobal}", args) s.run("tglobal")tableInsert函数除了可以追加多条数据之外,还可以直接追加一个表,其中,时间列仍然需要特殊说明。
直接通过部分应用的方式,将一个DataFrame直接上传到服务器并追加到内存表。
# --如果表中没有时间列 import pandas as pd # 生成内存表 script = """t = table(1:0,`id`ticker`price, [INT,STRING,DOUBLE]) share t as tdglobal""" s.run(script) # 生成要追加的DataFrame tb=pd.DataFrame({'id': [1, 2, 2, 3], 'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'], 'price': [22, 3.5, 21, 26]}) s.run("tableInsert{tdglobal}",tb)Python pandas中所有有关时间的数据类型均为datetime64,DolphinDB表中时间类型共9种。
因此在追加一个带有时间列的表时,需要:
先将Dataframe上传到服务器;通过select语句将表中的每一列都选出来;每一个时间列进行时间类型转换;构成一个新表;将这个信标追加到内存表种。 # --表中有时间列 import pandas as pd tb=pd.DataFrame({'id': [1, 2, 2, 3], 'date': np.array(['2019-02-04', '2019-02-05', '2019-02-09', '2019-02-13'], dtype='datetime64[D]'), 'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'], 'price': [22, 3.5, 21, 26]}) s.upload({'tb':tb}) s.run("tb1=table((exec id from tb) as id, (exec date(date) from tb) as date, (exec ticker from tb) as ticker, (exec price from tb) as price)") s.run("tableInsert(tglobal,tb1)")可以实现,但不推荐。
因为append!函数会返回一个表的schema,增加通信量。
此时,并不返回信息
本地磁盘表通常用于静态数据集的计算分析,既可以用于数据的输入,也可以作为计算的输出。它不支持事务,也不持支并发读写。
# 生成磁盘表 dbPath="'D:/DolphinDB/Python/valuedb'" tableName='dt' script = """t = table(100:0, `id`date`ticker`price, [INT,DATE,STRING,DOUBLE]); db = database({db}); saveTable(db, t, `{tb}); share t as tDiskGlobal;""".format(db=dbPath,tb=tableName) s.run(script)其中:
databae函数创建数据库;
saveTable函数将内存表保存到磁盘中;
tableInsert函数时向本地磁盘表追加数据最为常用的方式。本质上,tableInsert时将数据插入到内存表,需要使用saveTable函数将插入的数据保存到磁盘上。
# 在上述代码的基础上,运行此代码 # tableInsert将数据保存到本地磁盘表 rowNum = 5 ids = np.arange(1, rowNum+1, 1, dtype=np.int32) dates = np.array(pd.date_range('4/1/2019', periods=rowNum), dtype='datetime64[D]') tickers = np.repeat("AA", rowNum) prices = np.arange(1, 0.6*(rowNum+1), 0.6, dtype=np.float64) args = [ids, dates, tickers, prices] s.run("tableInsert{tDiskGlobal}", args) s.run("saveTable(db,tDiskGlobal,`{tb});".format(tb=tableName))与追加表到内存表类似,本地磁盘表也支持通过tableInsert函数和append!函数直接追加一个表,同样也需要区分有无时间列的情况,唯一的区别是,本地磁盘表在追加之后要执行saveTable函数来保存到磁盘上,具体操作过程不再赘述。
或者可以理解为,insert into、tableInsert、append!追加一条、多条、一个表到即有表上,都是追加到内存表,如果需要到磁盘表,再使用saveTable函数来保存。
分布式表是DolphinDB推荐在生产环境下使用的数据存储方式,
它支持快照级别的事务隔离,保证数据一致性;分布式表支持多副本机制,既提供了数据容错能力,又能作为数据访问的负载均衡。只有启用enableDFS=1的集群环境才能使用分布式表。
# 生成分布式表---此处需要连接数据节点(8921),不能是控制节点(8920) s = ddb.session() s.connect("localhost", 8921, 'admin', '123456') dbPath="'dfs://testPython'" tableName='t1' script = """ dbPath={db} if(existsDatabase(dbPath)) dropDatabase(dbPath) db = database(dbPath, VALUE, 0..100) t1 = table(10000:0,`id`cbool`cchar`cshort`cint`clong`cdate`cmonth`ctime`cminute`csecond`cdatetime`ctimestamp`cnanotime`cnanotimestamp`cfloat`cdouble`csymbol`cstring,[INT,BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,SYMBOL,STRING]) insert into t1 values (0,true,'a',122h,21,22l,2012.06.12,2012.06M,13:10:10.008,13:30m,13:30:10,2012.06.13 13:30:10,2012.06.13 13:30:10.008,13:30:10.008007006,2012.06.13 13:30:10.008007006,2.1f,2.1,'','') t = db.createPartitionedTable(t1, `{tb}, `id) t.append!(t1)""".format(db=dbPath,tb=tableName) s.run(script)DolphinDB提供loadTable方法来加载分布式表,通过tableInsert方式追加数据、append!追加表。
与内存表和磁盘表不同的是,分布式表在追加表的时候提供时间类型自动转换的功能,因此无需显示地进行类型转换。
更多方法见session.py
更多方法见table.py
以上3种方式都等价于在DolphinDB服务端调用table方法创建一个名为’tb’的内存数据表
showSQL()函数一样使用
groupby后面需要使用聚合函数,如count、sum、agg、agg2、having
contextby与groupby相似,区别在于groupby为每个组返回一个标量,但是contextby为每个组返回一个向量。每组返回的向量长度与这一组的行数相同。
merge用于内部连接、左连接、外部连接;
merge_asof表示asof join;
merge_window表示窗口连接。
对于merge,如果连接列名称相同,使用on参数指定连接列,如果连接列名称不同,使用left_on和right_on参数指定连接列。可选参数how表示表连接的类型。默认的连接类型时内部连接。分区表只能与分区表进行外部链接,内存表只能与内存表进行外部链接。
《DolphinDB使用案例16:流数据订阅》
Github Tutorials >> Python_api.md待更新 ↩︎
Github python3_api_experimental/README.md ↩︎