Hi community,
The Carbondata’s CDC is an important feature as CDC is an important use case in data analytics of merging the source data changes to target table. With the current design of CDC, the performance is not good when the data is huge in the target table and input source data also. This mail is to discuss about the improvements on this in phase wise. In the offline discussion in community, we have decided to go with the minmax based pruning in phase1, PFA link of design doc, please check and give your inputs/suggestions. https://docs.google.com/document/d/1Qa4yEbKYsYo7LUnnKjKHzF-DMtRUSfjkh9arWawPLSc/edit?usp=sharing Thanks and Regards, Akash R |
Hi all,
The design doc is updated, please go through and give your inputs/suggestions. Thanks, Regards, Akash R -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Hi Akash,
You can enhance the runtime filter to improve the join performance. It has the rule to dynamically check whether the join can add the runtime filter or not. Better to push down the runtime filter into CarbonDataSourceScan, and better to avoid adding a UDF function to rewrite the plan. ----- Best Regards David Cai -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai |
Hi David,
Are you talking about the dynamic run-time filter pushdown of spark ? If that is the case, I had already raised a discussion fee months back to do the similarly in Carbondata to improve carbon join by using carbondata's metadata. But since spark is already doing it , it's decided not to do any changes in carbon side for the same feature and make it complex. So this design is decided in the community meeting last week, please correct me if I'm wrong in understanding your reply. Also it's not plan change, basically, it's just adding a where filter of custom UDF of block paths in the current join query that's all which skips the unwanted block files to scan. Regards, Akash On Thu, Feb 18, 2021, 6:36 AM David CaiQiang <[hidden email]> wrote: > Hi Akash, > You can enhance the runtime filter to improve the join performance. > > It has the rule to dynamically check whether the join can add the > runtime filter or not. > > Better to push down the runtime filter into CarbonDataSourceScan, and > better to avoid adding a UDF function to rewrite the plan. > > > > > > ----- > Best Regards > David Cai > -- > Sent from: > http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > |
In reply to this post by David CaiQiang
Hi,
In addition to this, the probability of false positives will be more when we just push the runtime filter of source data min max ok target as it purely depends on source data. Even I had done POC by just adding a range filter on target table. We need file level or block level pruning like SI does. So this approach was decided. Regards Akash On Thu, Feb 18, 2021, 6:36 AM David CaiQiang <[hidden email]> wrote: > Hi Akash, > You can enhance the runtime filter to improve the join performance. > > It has the rule to dynamically check whether the join can add the > runtime filter or not. > > Better to push down the runtime filter into CarbonDataSourceScan, and > better to avoid adding a UDF function to rewrite the plan. > > > > > > ----- > Best Regards > David Cai > -- > Sent from: > http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > |
I mean you can push your logic into CarbonDataSourceScan as a dynamic runtime
filter. Actually, CarbonDataSourceScan already used min/max zoom maps as an index filter to prune blocklist (in the CarbonScanRDD.getPartition method). We can do more things on the join query. Here I assume the source table is much smaller than the target table. 1. when the join broadcast the source table 1.1 when the join columns contain the partition keys of the target table, it can reuse the result of the broadcast to prune the partitions of the target table. 1.2 when the join query has some filters on the target table, use min/max zoom maps to prune the block list of the target table 1.3 when the join query has some filters on the source table, it can use min/max zoom maps of join columns to match the result of the broadcast 2. when the join doesn't broadcast the source table 2.1 when the join query has some filters on the target table, use min/max zoom maps to prune the block list of the target table 2.2 join source table with min/max zoom maps of the target table to get the new block list. In the future, it better to move all pruning logics of the driver side into one place and invoke them in CarbonDataSourceScan to get input partitions for ScanRDD. (include min/max index, si, partition pruning, and dynamic filters) ----- Best Regards David Cai -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai |
Hi,
i got your point basically you wanted to make this logic to be useful for normal join also. But for the same thing i had raised a discussion before, you can check here. <http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/DISCUSSION-Join-optimization-with-Carbondata-s-metadata-tt103186.html#a103187> Since Spark already handling in new version, everyone's opinion was not to make before spark. SO this will be specific for CDC as here its little different as we are joining intermediate dataframe with source to get the files to scan. SO this should be fine. Only problem with the cartesian product as mentioned in design doc, you can check and give your inputs on that, i also have one more solution to search in a distributed way with a interval tree data structure. Thanks, Akash -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
+1, you can finish the implementation.
How about using the following SQL instead of the cartesian join? SELECT df.filePath FROM targetTableBlocks df where exists (select 1 from srcTable where srcTable.value between df.min and df.max) ----- Best Regards David Cai -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai |
Hi david,
Thanks for your suggestion. I checked in local about the query you suggested, its going as a *BroadcastNestedLoopJoin*. As in local dataset is small it goes for that, but in cluster when the data size grows it goes back to cartesian product again. How about our own search logic in a distributed way using Interval tree datastructure? it will be faster and wont impact much. Other please give your suggestions. Thanks Regards, Akash R -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
In reply to this post by akashrn5
Hi Akash, +1 Just ad few queries regd. query to filter only necessary target files/blocks. - SELECT target.filepath FROM targetTable,srcTable WHERE srcTable.value BETWEEN targetTable.min AND targetTable.max; 1. This range will be one per the extended blocklet. right ? I mean, number of ranges in the condition is equal to number extended blockets returned. Do we merge these ranges(in case of overlaps ) to reduce the number of conditions?? 2. Doing aggregate/group by target.filepath helps? Regards, Venu -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Hi Venu,
Thanks for your review. I have replied the same in the document. you are right 1. its taken care to group by extended blocklets on split path and get the min-max on block level 2. we need to do group by on the file path to avoid the duplicates from dataframe output. I have updated the same in the doc please have a look. Thanks, Akash R -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Free forum by Nabble | Edit this page |