Improve carbondata CDC performance

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Improve carbondata CDC performance

akashrn5
Hi community,

The Carbondata’s CDC is an important feature as CDC is an important use
case in data analytics of merging the source data changes to target table.
With the current design of CDC, the performance is not good when the data
is huge in the target table and input source data also. This mail is to
discuss about the improvements on this in phase wise.

In the offline discussion in community, we have decided to go with the
minmax based pruning in phase1, PFA link of design doc, please check and
give your inputs/suggestions.

https://docs.google.com/document/d/1Qa4yEbKYsYo7LUnnKjKHzF-DMtRUSfjkh9arWawPLSc/edit?usp=sharing

Thanks and Regards,
Akash R
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
Hi all,

The design doc is updated, please go through and give your
inputs/suggestions.

Thanks,

Regards,
Akash R



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

David CaiQiang
Hi Akash,
    You can enhance the runtime filter to improve the join performance.

    It has the rule to dynamically check whether the join can add the
runtime filter or not.

    Better to push down the runtime filter into CarbonDataSourceScan, and
better to avoid adding a UDF function to rewrite the plan.

   



-----
Best Regards
David Cai
--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
Hi David,


Are you talking about the dynamic run-time filter pushdown of spark ? If
that is the case, I had already raised a discussion fee months back to do
the similarly  in Carbondata to improve carbon join by using carbondata's
metadata. But since spark is already doing it , it's decided not to do any
changes in carbon side for the same feature and make it complex.

So this design is decided in the community meeting last week, please
correct me if I'm wrong in understanding your reply.

Also it's not plan change, basically, it's just adding a where filter of
custom UDF of block paths in the current join query that's all which skips
the unwanted block files to scan.

Regards,
Akash

On Thu, Feb 18, 2021, 6:36 AM David CaiQiang <[hidden email]> wrote:

> Hi Akash,
>     You can enhance the runtime filter to improve the join performance.
>
>     It has the rule to dynamically check whether the join can add the
> runtime filter or not.
>
>     Better to push down the runtime filter into CarbonDataSourceScan, and
> better to avoid adding a UDF function to rewrite the plan.
>
>
>
>
>
> -----
> Best Regards
> David Cai
> --
> Sent from:
> http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
In reply to this post by David CaiQiang
Hi,

In addition to this, the probability of false positives will be more when
we just push the runtime filter of source data min max ok target as it
purely depends on source data.

Even I had done POC by just adding a range filter on target table.

We need file level or block level pruning like SI does. So this approach
was decided.


Regards
Akash


On Thu, Feb 18, 2021, 6:36 AM David CaiQiang <[hidden email]> wrote:

> Hi Akash,
>     You can enhance the runtime filter to improve the join performance.
>
>     It has the rule to dynamically check whether the join can add the
> runtime filter or not.
>
>     Better to push down the runtime filter into CarbonDataSourceScan, and
> better to avoid adding a UDF function to rewrite the plan.
>
>
>
>
>
> -----
> Best Regards
> David Cai
> --
> Sent from:
> http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

David CaiQiang
I mean you can push your logic into CarbonDataSourceScan as a dynamic runtime
filter.

Actually, CarbonDataSourceScan already used min/max zoom maps as an index
filter to prune blocklist (in the CarbonScanRDD.getPartition method).

We can do more things on the join query. Here I assume the source table is
much smaller than the target table.

1. when the join broadcast the source table
    1.1 when the join columns contain the partition keys of the target
table,  it can reuse the result of the broadcast to prune the partitions of
the target table.
    1.2 when the join query has some filters on the target table, use
min/max zoom maps to prune the block list of the target table
    1.3 when the join query has some filters on the source table, it can use
min/max zoom maps of join columns to match the result of the broadcast

2. when the join doesn't broadcast the source table
    2.1 when the join query has some filters on the target table, use
min/max zoom maps to prune the block list of the target table
    2.2 join source table with min/max zoom maps of the target table to get
the new block list.

In the future, it better to move all pruning logics of the driver side into
one place and invoke them in CarbonDataSourceScan to get input partitions
for ScanRDD. (include min/max index, si, partition pruning, and dynamic
filters)



-----
Best Regards
David Cai
--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
Hi,

i got your point basically you wanted to make this logic to be useful for
normal join also.
But for the same thing i had raised a discussion before,  you can check
here.
<http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/DISCUSSION-Join-optimization-with-Carbondata-s-metadata-tt103186.html#a103187>  

Since Spark already handling in new version, everyone's opinion was not to
make before spark. SO this will be specific for CDC as here its little
different as we are joining intermediate dataframe with source to get the
files to scan. SO this should be fine.

Only problem with the cartesian product as mentioned in design doc, you can
check and give your inputs on that, i also have one more solution to search
in a distributed way with a interval tree data structure.

Thanks,
Akash



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

David CaiQiang
+1, you can finish the implementation.

How about using the following SQL instead of the cartesian join?

SELECT df.filePath
FROM targetTableBlocks df
where exists (select 1 from srcTable where  srcTable.value between df.min
and df.max)



-----
Best Regards
David Cai
--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
Hi david,

Thanks for your suggestion.

I checked in local about the query you suggested, its going as a
*BroadcastNestedLoopJoin*.
As in local dataset is small it goes for that, but in cluster when the data
size grows it goes back to cartesian product again.

How about our own search logic in a distributed way using Interval tree
datastructure? it will be faster and wont impact much.

Other please give your suggestions.

Thanks

Regards,
Akash R



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

VenuReddy
In reply to this post by akashrn5

Hi Akash,

+1

Just ad few queries regd. query to filter only necessary target
files/blocks. -

SELECT target.filepath
                FROM targetTable,srcTable
WHERE srcTable.value BETWEEN targetTable.min AND targetTable.max;

1. This range will be one per the extended blocklet. right ? I mean, number
of ranges in the condition is equal to number extended blockets returned. Do
we merge these ranges(in case of overlaps ) to reduce the number of
conditions??

2. Doing aggregate/group by target.filepath helps?

Regards,
Venu



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
Hi Venu,

Thanks for your review.

I have replied the same in the document.
you are right

1. its taken care to group by extended blocklets on split path and get the
min-max on block level
2. we need to do group by on the file path to avoid the duplicates from
dataframe output. I have updated the same in the doc please have a look.

Thanks,
Akash R



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/