-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSPARK
More file actions
448 lines (335 loc) · 14.4 KB
/
SPARK
File metadata and controls
448 lines (335 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
APACHE SPARK
============
general purpose
plug and play
in memory
compute engine
compute engine:
---------------
Hadoop
HDFS
MapReduce //Spark is alternate for MapReduce
YARN
Spark needs 2 things:
Storage: local storage, hdfs, Amazon S3
Resource Manager: YARN, Kubernetes, Mesos
in-memory
---------
in mapReduce:
mr1 mr2 mr3 mr4
HDFS
Before aand after each mapreduce job, read and write is done respectively
for each mr -> 2 Disk I/Os
in Spark:
v1 v2 v3 v4
HDFS
Initial seek then everything in memory, then final write
only 2 disk I/Os are required
10-100 times faster than mapreduce
general purpose
---------------
pig for cleaning
hive for querying
mahout for ML
sqoop for migrate data to-from database
storm for streaming data
Learn 1 style to write code:
and do all the above mentioned works
not only bound to map-reduce
map + reduce is there
+ there are a lot of other operations
----------------------------------------------------------------------------------------------------------------------------------------------
RDD: Basic unit which holds data in spark; Resilient Distributed Dataset
In memory distributed collection
If RDD3 is lost it will check for its parent graph which is RDD2 and quickly apply the operation to generate RDD3
Resilient (fault tolerant)
Immutable
Why transformation are lazy?
So, that spark internally can optimize the operations according to DAG (predicate push downs)
----------------------------------------------------------------------------------------------------------------------------------------------
spark-shell
pyspark
sc = SparkContext
>>spark-shell
val rdd1 = sc.textFile("/user/cloudera/sparkInput/")
val rdd2 = rdd1.flatMap(_.split(" "))
val rdd3 = rdd2.map(x=>(x,1))
val rdd4 = rdd3.reduceByKey(_+_)
rdd4.collect()
//localhost: 4040 or 4041
----------------------------------------------------------------------------------------------------------------------------------------------
Writting in IDE:
extend App
or
declare main method
When writing in terminal sc was available, but not in IDE
val sc = new SparkContext("local[*]","<objectName>")
spark 2.4.4 uses scala 2.11
to import packages ctrl+shift+O
In IDE dag is visible only till the program is running.
So, admins set up a history server for us to see the DAG
To see the DAG we can write:
scala.io.StdIn.readLine()
---------------------------------------------------------------------------------------------------------------------------------------------
val input = sc.textFile("/path")
val mappedInput = input.map(x=>x.split("\t")(2))
val results = mappedInput.countByValue
results.foreach(println)
// map + reduceByKey = countByValue
.map(x=>(x._1,(x._2,1))) ====> .mapValues(x=>(x,1))
---------------------------------------------------------------------------------------------------------------------------------------------
pyspark: wordcount
Step 1: remove val/var
Step 2: anonymous function is called lambda, so, prefix all anonymous function as lambda
Step 3: => becomes :
sparkScala code:
val rdd2 = rdd1.flatMap(x => x.split(" "))
val intermediateRdd = rdd2.map(x => x.toLowerCase())
val rdd3 = intermediateRdd.map(x => (x,1))
val rdd4 = rdd3.reduceByKey(x,y => x+y)
Step1:
rdd2 = rdd1.flatMap(x => x.split(" "))
intermediateRdd = rdd2.map(x => x.toLowerCase())
rdd3 = intermediateRdd.map(x => (x,1))
rdd4 = rdd3.reduceByKey(x,y => x+y)
Step2:
rdd2 = rdd1.flatMap(lambda x => x.split(" "))
intermediateRdd = rdd2.map(lambda x => x.toLowerCase())
rdd3 = intermediateRdd.map(lambda x => (x,1))
rdd4 = rdd3.reduceByKey(lambda x,y => x+y)
Step3:
rdd2 = rdd1.flatMap(lambda x : x.split(" "))
intermediateRdd = rdd2.map(lambda x : x.toLowerCase())
rdd3 = intermediateRdd.map(lambda x : (x,1))
rdd4 = rdd3.reduceByKey(lambda x,y : x+y)
Instead of .collect write .collect()
Also we can save as text file
rdd4.saveAsTextFile("/path")
val rdd1 = sc.textFile("File://(indicatedThaTheFileIsInLocalSystemEg:/home/cloudera/Desktop/text.txt)")
---------------------------------------------------------------------------------------------------------------------------------------------
SCALA SPARK:
Map side Join in hive is called as broadcast Join in spark
Achieved using broadcast variable
To open a file in local machine:
Source.fromFile("/path")
To broadcast
val <name> = sc.broadcast(passFunctionWhichHasSet/Array/Etc)
-----------------------------------------------------------------------------------------------------------------------------------------------------
SPARK ACCUMULATOR
=================
Shared variable accessiible by every machine(executor)
Executor cannor read the accumulator data, it can only update it
Same as counters in MR
CODE:
val input = sc.textFile("/path")
val myAccum = sc.longAccumulator("Blank lines accuumulator")
input.foreach(x=> if(x == "") myAccum.add(1))
myAccum.value
2 Kinds of shhared variable
---------------------------
1. Braodcast variable: separate copy for each machine (same as mapSide join in Hive)
2. Accumulator: single copy on the driver machine (same as counters in MR)
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
No. of Blocks and creating RDD from local variable
==================================================
.parallelize(myList) ==> Converts local variable to RDD
No. of blocks created in this is set in:
sc.defaultParallelism
can be changed
to check the No. of partitions:
<RDDname>.getNumPartitions
property for minimum no. of partitions:
<RDDname>.defaultMinPartitions
> This property comes into play when we load from file systems
> Even if file size is smaller than block size for that machine(32mb in local and 128mb in HDFS), It will create RDD partitions equal to <RDDname>.defaultMinPartitions no.
> If size is more than default block size no problem
-----------------------------------------------------------------------------------------------------------------------------------------------------
Narrow Transformations v/s Wide Transformations
===============================================
Narrow
------
No shuffling
No Stage changes
map
flatMap
filter
repartition
Wide
----
Shuffling
Stage changes
reduceByKey
groupByKey
Stages: these are marked by shuffle boundaries
3 wide transformation gives 4 stages
Output of stage 1 is sent to disks; Stage 2 reads it back from the disk
TIP: Use wide transormation later in the code
-----------------------------------------------------------------------------------------------------------------------------------------------------
reduce V/S reduceByKey
======================
reduceByKey:
Transformation
Gives RDD
Works only on pair RDDs
reduce:
Action
Gives local variable
Why reduceByKey is kept as a transformation and reduce as an action
-> Reduce gives you a single output which is very small
-> ReduceByKey might still give huge data and we might need to work more on the data
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
groupByKey V/S reduceByKey
==========================
Both are wide transformations
reduceByKey:
local aggregations first(like combiner)
less shuffling due to local aggregations
groupByKey:
no local aggregation
all key value pairs are shuffled therefore more shuffling
huge possibility of out of memory error:
data shuffled and sits on <no. of distinct keys> no. of machines
Never use groupByKey
-----------------------------------------------------------------------------------------------------------------------------------------------------
Note for localhost:4040 Spark UI visualization
No. of jobs = No. of actions
New stage corresponding to every wide transformation
a task corresponds to each partition
blockSize on local machine = 32mb
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
tuple of 2 V/S map of 2
=======================
tuple of 2:
rdd having tuple of 2 are called pair rdds
Many transformations like reduceByKey, mapValues etc work only on pair rdds
map of 2:
Keys cannot be repeated, needs to be distinct
-----------------------------------------------------------------------------------------------------------------------------------------------------
Calling multiple actions
========================
When we call the first action:
The DAG created with all transformation is executed.
When we call the second action:
All the transformation from the very beginning are executed.
However spark does some internal optimization:
Like, skips stages, as change in stage means disk I/O is done, therefore it reads the output from the previous stage from the Disk and continues with the DAG.
Therefore when we call any action:
All transformation from the very beginning are executed that is why we should use cache and persist
-----------------------------------------------------------------------------------------------------------------------------------------------------
cache V/S persist:
after doing a bunch of operations we can store the rdd so that future transformations start form here and not from the very beginning.
cache:
caching in the memory
If no memory is available, it wont give error, just skip.
persist:
various storage levels.
persist() -> in memory(non serialized format(object format))
persist(StorageLevel.DISK_ONLY) -> only disk(serialized format(byte format))
persist(StorageLevel.MEMORY_ONLY) -> only memory(non serialized format(object format)). If no memory is available, it wont give error, just skip.
persist(StorageLevel.MEMORY_AND_DISK) -> data is cached in memory. If enough memory is not there, evicted blocks are serialized to disks(recommended as re-avaluation is expensive)
persist(StorageLevel.OFF_HEAP) -> outside the JVM(raw memory outside the executor). JVM uses garbage collection which is a time taking process.(unsafe operations but performant)(serialized)
BLOCK EVICTION
==============
> Consider the situation in which some of the block partitions are so large(skew) that they will quickly fill up the storage memory used for caching.
> When the storage memory becomes full, an eviction policy will be used to make up space for new blocks
> LRU(Least Recently Used)
NOTE:
-----
Serialization: increases processing cost but reduces memory foot print
Non Serialization: processing can be a bit fast but uses more memory footprint
MEMORY_ONLY_SER: same as MEMORY_ONLY, but serialized
MEMORY_AND_DISK_SER: same as MEMORY_AND_DISK, but serialized
MEMORY_ONLY_2: indicated 2 replicas, each stored on 2 different worker nodes.
Replication is useful for faster recovery in case of failure.
<RDD_name>.toDebugString: to check lineage graph read from bottom to top.
-----------------------------------------------------------------------------------------------------------------------------------------------------
repartition V/S coalesce
========================
modify the no. of partitions
repartition:
val newRDD = oldRDD.repartition(newNum)
increase as well as decrease
During reuduce:
intension to give final partitions of almost equal size => full shuffling
coalesce:
val newRDD = oldRDD.coalesce(newNumLessThanPreviousPartitionNum)
only decrease
if you try increasing no error, but it wont increase
preferred when decreasing, as it minimizes shuffling
local combination of existing partitions if possible => less shuffling
-----------------------------------------------------------------------------------------------------------------------------------------------------
DAG V/S Lineage
===============
Lineage:
<RDD_name>.toDebugString
even after transformation
dependency graph, shows dependencies of various rdds
DAG:
only visible after an action is called.
Directed acyclic graph
jobs stages tasks
-----------------------------------------------------------------------------------------------------------------------------------------------------
Making a jar file
=================
export as jar:
runnable jar:
all dependencies, fat jar
jar:
normal without dependencies
spark-submit --class <classname> /path of jar
-----------------------------------------------------------------------------------------------------------------------------------------------------
map V/S mapPartition
=====================
Suppose rdd has 10000 rows in total divided in 10 partitions
map
---
works on each input row
rdd.map() -> this will be called 10000 times.
mapPartition
------------
works on each partition
rdd.mapPartitions() -> this will be called 10 times, as there are only 10 partitions.
-----------------------------------------------------------------------------------------------------------
SPARK-2
=======
-----------------------------------------------------------------------------------------------------------
Structured API's
================
Dataframes and Datasets
Dataframes: Distributed collection of data organized into named columns.
Equivalent to a table in sql
DF and DS were in spark 1 also, but in spark 2 we get a better support for the same and these are merged into a single API, called datasets api..
Earlier every context was to be made separately, like spark context, hive context, sql context.
Now, it provides everything under 1 unified umbrella- spark session
Spark session is a singleton object
.builder() will return builder object
this will help us configure the spark session.
.appName
.master
Treat SparkSession as your driver
.
Example DataFramesExample:
=====================================================================================================================================================
Transformation:
flatMap
map
filter
reduceByKey
mapValues
flatMapValues
repartition
join
coalesce
repartition
sortBy
Action
collect
reduce
countByValue
take(<a number>)
saveAsTextFile("/path")
count
read
//infering schema is another action
sortByKey:
hybrid, considered as a transformation but it is shown in Spark UI as action since part of it has to be run