Throw NullPointerException occasionally when query from stream table

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Throw NullPointerException occasionally when query from stream table

xm_zzc
Hi:
  I ran a structured streaming app on local[4] mode (Spark 2.3.2 +
CarbonData master branch) to insert data, and then started a thread to
execute select sql, the 'NullPointerException' occured occasionally.
  *I found that the smaller the value of CarbonCommonConstants.HANDOFF_SIZE
is, the more easily the error occur*.
  Please see my test code:  CarbonStream1_5.scala
<http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/file/t133/CarbonStream1_5.scala>  
 
  The  NullPointerException is :
  Exception in thread "Thread-42" java.lang.NullPointerException
        at
org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap.getFileFooterEntrySchema(BlockDataMap.java:1001)
        at
org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap.prune(BlockDataMap.java:656)
        at
org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap.prune(BlockDataMap.java:743)
        at
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory.getAllBlocklets(BlockletDataMapFactory.java:391)
        at
org.apache.carbondata.core.datamap.TableDataMap.prune(TableDataMap.java:132)
        at
org.apache.carbondata.hadoop.api.CarbonInputFormat.getPrunedBlocklets(CarbonInputFormat.java:491)
        at
org.apache.carbondata.hadoop.api.CarbonInputFormat.getDataBlocksOfSegment(CarbonInputFormat.java:412)
        at
org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:528)
        at
org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:219)
        at
org.apache.carbondata.spark.rdd.CarbonScanRDD.internalGetPartitions(CarbonScanRDD.scala:127)
        at
org.apache.carbondata.spark.rdd.CarbonRDD.getPartitions(CarbonRDD.scala:67)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
        at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:321)
        at
org.apache.spark.sql.execution.TakeOrderedAndProjectExec.doExecute(limit.scala:154)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
        at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
        at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
        at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:725)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:702)
        at
cn.xm.zzc.carbonmaster.CarbonStream1_5$$anon$1$$anonfun$run$1.apply$mcVI$sp(CarbonStream1_5.scala:202)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166)
        at
cn.xm.zzc.carbonmaster.CarbonStream1_5$$anon$1.run(CarbonStream1_5.scala:200)
       
        *BTW, the index fiels were not merged after handoff, handoff operation does
not support merging index files now, right?*
 



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Throw NullPointerException occasionally when query from stream table

xm_zzc
This post was updated on .
Hi:
  I added some logs to trace this problem, found that when call
BlockDataMap.getFileFooterEntrySchema, the key 'segmentPropertiesIndex'
which was stored in BlockDataMap instance was removed by other thread from
SegmentPropertiesAndSchemaHolder.indexToSegmentPropertiesWrapperMapping :

2018-10-31 14:49:24,967
datastore.block.SegmentPropertiesAndSchemaHolder.addSegmentProperties(SegmentPropertiesAndSchemaHolder.java:115)
Thread-39 -========put 37 into indexToSegmentPropertiesWrapperMapping 0

2018-10-31 14:49:25,472
datastore.block.SegmentPropertiesAndSchemaHolder.invalidate(SegmentPropertiesAndSchemaHolder.java:243)
Executor task launch worker for task 926 -========remove 37 out of
indexToSegmentPropertiesWrapperMapping 31

2018-10-31 14:49:25,486
indexstore.blockletindex.BlockDataMap.getFileFooterEntrySchema(BlockDataMap.java:1002)
Thread-39 -========get 37 null

the key segmentPropertiesIndex=37 was removed at 2018-10-31 14:49:25,472.

2018-10-31 14:56:45,057
datastore.block.SegmentPropertiesAndSchemaHolder.addSegmentProperties(SegmentPropertiesAndSchemaHolder.java:115)
Thread-39 -========put 98 into indexToSegmentPropertiesWrapperMapping 0

2018-10-31 14:56:45,477
datastore.block.SegmentPropertiesAndSchemaHolder.invalidate(SegmentPropertiesAndSchemaHolder.java:243)
Executor task launch worker for task 2653 -========remove 98 out of
indexToSegmentPropertiesWrapperMapping 67

2018-10-31 14:56:46,290
indexstore.blockletindex.BlockDataMap.getFileFooterEntrySchema(BlockDataMap.java:1002)
Thread-39 -========get 98 null

2018-10-31 14:56:51,392
indexstore.blockletindex.BlockDataMap.getFileFooterEntrySchema(BlockDataMap.java:1002)
Thread-39 -========get 98 null




--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Throw NullPointerException occasionally when query from stream table

xm_zzc
Hi:
  The root cause is that when execute select sql, BlockDataMap will call
'SegmentPropertiesAndSchemaHolder.addSegmentProperties ' to add segment info
one by one, meanwhile if there are some segments updated, for example,
stream segment is handoff , handoff thread will call
'SegmentPropertiesAndSchemaHolder.invalidate' to delete segment info one by
one, if segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty() is
true, it will remove segmentPropertiesIndex, but select thread is still
using segmentPropertiesIndex to add/get segment info, and then NPE occur.



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Throw NullPointerException occasionally when query from stream table

David CaiQiang
Where do we call SegmentPropertiesAndSchemaHolder.invalidate in handoff
thread?



-----
Best Regards
David Cai
--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai
Reply | Threaded
Open this post in threaded view
|

Re: Throw NullPointerException occasionally when query from stream table

xm_zzc