DolphinDB海量数据查询:分页与采样
目录
- 摘要
- 一、海量数据查询挑战
- 1.1 海量数据查询问题
- 1.2 解决方案
- 二、分页查询
- 2.1 LIMIT分页
- 2.2 TOP分页
- 2.3 分页函数
- 2.4 分布式表分页
- 2.5 分页最佳实践
- 三、数据采样
- 3.1 随机采样
- 3.2 系统采样
- 3.3 分层采样
- 3.4 时间采样
- 3.5 采样函数
- 四、结果缓存
- 4.1 内存缓存
- 4.2 共享表缓存
- 4.3 缓存更新策略
- 五、异步查询
- 5.1 异步查询原理
- 5.2 异步查询实现
- 六、查询优化技巧
- 6.1 减少返回列
- 6.2 使用分区条件
- 6.3 避免深分页
- 6.4 分批处理
- 七、实战案例
- 7.1 设备数据分页查询API
- 7.2 数据采样分析
- 八、总结
- 参考资料
摘要
本文深入讲解DolphinDB海量数据查询技术。从分页查询到数据采样,从结果缓存到查询优化,全面介绍处理海量数据的核心方法。通过丰富的代码示例,帮助读者掌握海量数据查询的核心技能。
一、海量数据查询挑战
1.1 海量数据查询问题
工业物联网场景下,数据量巨大,查询面临挑战:
1.2 解决方案
| 方案 | 说明 | 适用场景 |
|---|---|---|
| 分页查询 | 分批返回结果 | 列表展示 |
| 数据采样 | 采样部分数据 | 数据分析 |
| 结果缓存 | 缓存查询结果 | 重复查询 |
| 异步查询 | 异步执行查询 | 长时间查询 |
二、分页查询
2.1 LIMIT分页
//创建测试数据 t=table(1..10000asid,2024.01.01+0..9999asdate,rand(20.0..30.0,10000)astemperature,rand(40.0..60.0,10000)ashumidity)//基本分页:每页100条 select*fromt limit100//分页查询:第2页(偏移100条) select*fromt limit100offset100//分页查询:第3页 select*fromt limit100offset2002.2 TOP分页
//TOP分页:返回前N条 select top100*fromt//TOP分页带排序 select top100*fromt order byiddesc2.3 分页函数
//分页查询函数defpaginateQuery(table,pageSize,pageNum){offset=pageSize*(pageNum-1)returnselect*fromtable limit pageSize offset offset}//使用分页函数 t=table(1..10000asid,rand(100.0,10000)asvalue)//第1页 paginateQuery(t,100,1)//第5页 paginateQuery(t,100,5)//第10页 paginateQuery(t,100,10)2.4 分布式表分页
//创建分布式表 db=database("dfs://page_db",VALUE,1..100)schema=table(1:0,`device_id`timestamp`temperature`humidity,[INT,TIMESTAMP,DOUBLE,DOUBLE])db.createPartitionedTable(schema,`sensor_data,`device_id)//插入数据 loadTable("dfs://page_db","sensor_data").append!(table(take(1..100,100000)asdevice_id,take(now(),100000)astimestamp,rand(20.0..30.0,100000)astemperature,rand(40.0..60.0,100000)ashumidity))//分布式表分页查询 t=loadTable("dfs://page_db","sensor_data")//第1页 select*fromt limit100//第2页(注意:分布式表offset可能较慢) select*fromt limit100offset100//优化:使用分区条件 select*fromt where device_id=1limit1002.5 分页最佳实践
| 实践 | 说明 |
|---|---|
| 使用分区条件 | 减少扫描数据量 |
| 避免大offset | 大offset性能差 |
| 使用游标 | 替代大offset |
| 限制页数 | 避免深分页 |
三、数据采样
3.1 随机采样
//创建测试数据 t=table(1..100000asid,rand(100.0,100000)asvalue)//随机采样:采样10%select*fromt sample0.1//随机采样:采样1000条 select*fromt sample10003.2 系统采样
//系统采样:每隔N条取一条 select*fromt whereid%100=0//系统采样:每隔1000条取一条 select*fromt whereid%1000=03.3 分层采样
//创建带分类的数据 t=table(take(1..10,100000)asgroup_id,rand(100.0,100000)asvalue)//分层采样:每组采样100条 select*from(select*,row_number()over(partition by group_id order by rand())asrnfromt)where rn<=1003.4 时间采样
//创建时间序列数据 t=table(2024.01.01T00:00:00+0..99999*60000astimestamp,//每分钟 rand(20.0..30.0,100000)astemperature)//时间采样:每小时取一条 select*fromt where minute(timestamp)=0//时间采样:使用bar函数 select bar(timestamp,1h)ashour,first(temperature)assample_tempfromt group by bar(timestamp,1h)3.5 采样函数
//采样函数封装defsampleData(table,sampleRate){returnselect*fromtable sample sampleRate}defsampleByGroup(table,groupCol,sampleSize){returnselect*from(select*,row_number()over(partition by groupCol order by rand())asrnfromtable)where rn<=sampleSize}//使用采样函数 t=table(take(1..10,10000)asgroup_id,rand(100.0,10000)asvalue)//随机采样10%sampleData(t,0.1)//分组采样:每组100条 sampleByGroup(t,`group_id,100)四、结果缓存
4.1 内存缓存
//查询结果缓存到内存 t=loadTable("dfs://page_db","sensor_data")//执行查询并缓存 cachedResult=select device_id,avg(temperature)asavg_tempfromt where date(timestamp)=2024.01.15group by device_id//使用缓存结果 select*fromcachedResult where avg_temp>254.2 共享表缓存
//使用共享表缓存 share cachedResultasshared_cache//其他会话可以使用 select*fromshared_cache4.3 缓存更新策略
//定时更新缓存defupdateCache(){t=loadTable("dfs://page_db","sensor_data")result=select device_id,avg(temperature)asavg_tempfromt where date(timestamp)=today()-1group by device_id//更新共享缓存 shared_cache=result}//每日更新 scheduleJob("update_cache","更新缓存",updateCache,01:00,2024.01.01,2030.12.31,'D')五、异步查询
5.1 异步查询原理
5.2 异步查询实现
//异步查询函数defasyncQuery(queryFunc){//提交查询 jobId=submitJob("async_query","异步查询",queryFunc)//返回作业IDreturnjobId}//查询作业状态defcheckQueryStatus(jobId){returngetJobStatus(jobId)}//获取查询结果defgetQueryResult(jobId){returngetJobReturn(jobId)}//使用异步查询 t=loadTable("dfs://page_db","sensor_data")//提交异步查询 jobId=asyncQuery(def(){returnselect count(*)fromloadTable("dfs://page_db","sensor_data")})//检查状态 checkQueryStatus(jobId)//获取结果 getQueryResult(jobId)六、查询优化技巧
6.1 减少返回列
//不推荐:返回所有列 select*fromt limit1000//推荐:只返回需要的列 selectid,temperaturefromt limit10006.2 使用分区条件
//不推荐:全表扫描 select*fromt limit1000//推荐:分区裁剪 select*fromt where device_idin1..10limit10006.3 避免深分页
//不推荐:深分页(offset大) select*fromt limit100offset100000//推荐:使用游标 select*fromt whereid>100000//使用上一页最后一条的idlimit1006.4 分批处理
//分批处理大数据defbatchProcess(tableName,batchSize,processFunc){t=loadTable(tableName)total=execcount(*)fromtfor(iin0..(total \ batchSize)){batch=select*fromt limit batchSize offset i*batchSize processFunc(batch)}}//使用分批处理 batchProcess("dfs://page_db/sensor_data",10000,def(batch){//处理每批数据print("处理批次: "+string(batch.rows()))})七、实战案例
7.1 设备数据分页查询API
//设备数据分页查询APIdefqueryDeviceData(deviceId,startDate,endDate,pageSize,pageNum){t=loadTable("dfs://page_db","sensor_data")offset=pageSize*(pageNum-1)//查询总数 total=execcount(*)fromt where device_id=deviceIdanddate(timestamp)between startDateandendDate//分页查询 data=select*fromt where device_id=deviceIdanddate(timestamp)between startDateandendDate order by timestamp limit pageSize offset offset//返回结果returndict(STRING,ANY,[["total",total],["pageSize",pageSize],["pageNum",pageNum],["totalPages",ceil(total/pageSize)],["data",data]])}//使用API result=queryDeviceData(1,2024.01.01,2024.01.31,100,1)print("总数: "+string(result["total"]))print("数据: ")print(result["data"])7.2 数据采样分析
//数据采样分析defanalyzeSampleData(sampleRate){t=loadTable("dfs://page_db","sensor_data")//采样数据 sample=select*fromt sample sampleRate//统计分析 stats=select device_id,count(*)ascnt,avg(temperature)asavg_temp,std(temperature)asstd_tempfromsample group by device_idreturnstats}//使用采样分析 analyzeSampleData(0.01)//1%采样八、总结
本文详细介绍了DolphinDB海量数据查询:
- 分页查询:LIMIT、TOP、分页函数
- 数据采样:随机采样、系统采样、分层采样
- 结果缓存:内存缓存、共享表缓存
- 异步查询:异步提交、状态检查、结果获取
- 查询优化:减少列、分区条件、避免深分页
- 实战应用:分页API、采样分析
思考题:
- 如何优化深分页查询?
- 采样查询适合什么场景?
- 如何设计高效的分页API?
参考资料
- DolphinDB分页查询
- DolphinDB数据采样
