@@ -15,6 +15,8 @@ import org.apache.spark.sql.functions._
1515import org .apache .spark .sql .types .StringType
1616import org .locationtech .jts .{geom => jts }
1717
18+ import scala .reflect .ClassTag
19+
1820object VectorPipe {
1921
2022 /** Vectortile conversion options.
@@ -46,7 +48,10 @@ object VectorPipe {
4648 def forAllZoomsWithSrcProjection (zoom : Int , crs : CRS ) = Options (zoom, Some (0 ), crs, None )
4749 }
4850
49- def apply (input : DataFrame , pipeline : vectortile.Pipeline , options : Options ): Unit = {
51+ def apply [T : ClassTag ](input : DataFrame , pipeline : vectortile.Pipeline , options : Options ): Unit = {
52+ import input .sparkSession .implicits ._
53+ import vectorpipe .encoders ._
54+
5055 val geomColumn = pipeline.geometryColumn
5156 assert(input.columns.contains(geomColumn) &&
5257 input.schema(geomColumn).dataType.isInstanceOf [org.apache.spark.sql.jts.AbstractGeometryUDT [jts.Geometry ]],
@@ -74,46 +79,49 @@ object VectorPipe {
7479 SpatialKey (k.col / 2 , k.row / 2 ) }.toSeq
7580 }
7681
77- def generateVectorTiles [G <: Geometry ](df : DataFrame , level : LayoutLevel ): RDD [(SpatialKey , VectorTile )] = {
82+ def generateVectorTiles [G <: Geometry ](df : DataFrame , level : LayoutLevel ): Dataset [(SpatialKey , Array [ Byte ] )] = {
7883 val zoom = level.zoom
79- val clip = udf { (g : jts.Geometry , key : GenericRowWithSchema ) =>
80- val k = getSpatialKey(key)
81- pipeline.clip(g, k, level)
82- }
8384
84- val selectedGeometry = pipeline
85- .select(df, zoom, keyColumn)
85+ val selectedGeometry = pipeline.select match {
86+ case None => df
87+ case Some (select) => select(df, zoom, keyColumn)
88+ }
8689
87- val clipped = selectedGeometry
90+ val keyed = selectedGeometry
8891 .withColumn(keyColumn, explode(col(keyColumn)))
89- .repartition(col(keyColumn)) // spread copies of possibly ill-tempered geometries around cluster prior to clipping
90- .withColumn(geomColumn, clip(col(geomColumn), col(keyColumn)))
92+
93+ val clipped = pipeline.clip match {
94+ case None => keyed
95+ case Some (clipper) =>
96+ val clip = udf { (g : jts.Geometry , key : GenericRowWithSchema ) =>
97+ val k = getSpatialKey(key)
98+ clipper(g, k, level)
99+ }
100+ val toClip = keyed.repartition(col(keyColumn)) // spread copies of possibly ill-tempered geometries around cluster prior to clipping
101+ toClip.withColumn(geomColumn, clip(col(geomColumn), col(keyColumn)))
102+ }
91103
92104 pipeline.layerMultiplicity match {
93105 case SingleLayer (layerName) =>
94106 clipped
95- .rdd
96- .map { r => (getSpatialKey(r, keyColumn), pipeline.pack(r, zoom)) }
97- .groupByKey
98- .map { case (key, feats) =>
107+ .map { r => SingleLayerEntry (getSpatialKey(r, keyColumn), pipeline.pack(r, zoom)) }
108+ .groupByKey(_.key)
109+ .mapGroups { (key : SpatialKey , sleIter : Iterator [SingleLayerEntry ]) =>
99110 val ex = level.layout.mapTransform.keyToExtent(key)
100- key -> buildVectorTile(feats , layerName, ex, options.tileResolution, options.orderAreas)
111+ key -> buildVectorTile(sleIter.map(_.feature).toIterable , layerName, ex, options.tileResolution, options.orderAreas).toBytes
101112 }
102113 case LayerNamesInColumn (layerNameCol) =>
103114 assert(selectedGeometry.schema(layerNameCol).dataType == StringType ,
104115 s " layerMultiplicity= ${pipeline.layerMultiplicity} requires String-type column of name ${layerNameCol}" )
116+
105117 clipped
106- .rdd
107- .map { r => (getSpatialKey(r, keyColumn), r.getAs[String ](layerNameCol) -> pipeline.pack(r, zoom)) }
108- .groupByKey
109- .mapPartitions{ iter : Iterator [(SpatialKey , Iterable [(String , VectorTileFeature [Geometry ])])] =>
110- iter.map{ case (key, groupedFeatures) => {
111- val layerFeatures : Map [String , Iterable [VectorTileFeature [Geometry ]]] =
112- groupedFeatures.groupBy(_._1).mapValues(_.map(_._2))
113- val ex = level.layout.mapTransform.keyToExtent(key)
114- key -> buildVectorTile(layerFeatures, ex, options.tileResolution, options.orderAreas)
115- }}
116- }
118+ .map { r => MultipleLayerEntry (getSpatialKey(r, keyColumn), r.getAs[String ](layerNameCol), pipeline.pack(r, zoom)) }
119+ .groupByKey(_.key)
120+ .mapGroups{ (key : SpatialKey , iter : Iterator [MultipleLayerEntry ]) =>
121+ val ex = level.layout.mapTransform.keyToExtent(key)
122+ val layerFeatures = iter.toSeq.groupBy(_.layer).mapValues(_.map(_.feature))
123+ key -> buildVectorTile(layerFeatures, ex, options.tileResolution, options.orderAreas).toBytes
124+ }
117125 }
118126 }
119127
@@ -134,16 +142,30 @@ object VectorPipe {
134142 } else {
135143 df
136144 }
137- val simplify = udf { g : jts.Geometry => pipeline.simplify(g, level.layout) }
138- val reduced = pipeline
139- .reduce(working, level, keyColumn)
140- val prepared = reduced
141- .withColumn(geomColumn, simplify(col(geomColumn)))
142- val vts = generateVectorTiles(prepared, level)
145+
146+ val reduced = pipeline.reduce match {
147+ case None => working
148+ case Some (reduce) => reduce(working, level, keyColumn)
149+ }
150+
151+ val simplified = pipeline.simplify match {
152+ case None => reduced
153+ case Some (simplifier) =>
154+ val simplify = udf { g : jts.Geometry => simplifier(g, level.layout) }
155+ reduced.withColumn(geomColumn, simplify(col(geomColumn)))
156+ }
157+
158+ val vts = generateVectorTiles(simplified, level)
143159 saveVectorTiles(vts, zoom, pipeline.baseOutputURI)
144- prepared.withColumn(keyColumn, reduceKeys(col(keyColumn)))
160+
161+ simplified.withColumn(keyColumn, reduceKeys(col(keyColumn)))
145162 }
146163
147164 }
148165
166+ private case class SingleLayerEntry (key : SpatialKey , feature : VectorTileFeature [Geometry ])
167+ private case class MultipleLayerEntry (key : SpatialKey , layer : String , feature : VectorTileFeature [Geometry ])
168+
169+ private implicit def sleEncoder : Encoder [SingleLayerEntry ] = Encoders .kryo[SingleLayerEntry ]
170+ private implicit def mleEncoder : Encoder [MultipleLayerEntry ] = Encoders .kryo[MultipleLayerEntry ]
149171}
0 commit comments