Presentation

IBM Research - Tokyo
Scalable Performance of System S for
Extract-Transform-Load Processing
Toyotaro Suzumura, Toshihiro Yasue and
Tamiya Onodera
IBM Research - Tokyo
© International Business Machines Corporation 2008
2016/6/28
IBM Research - Tokyo
Outline
 Background and Motivation
 System S and its suitability for ETL
 Performance Evaluation of System S as a
Distributed ETL Platform
 Performance Optimization
 Related Work and Conclusions
2
© International Business Machines Corporation 2008
IBM Research - Tokyo
What is ETL ?
ETL = Extraction + Transformation + Loading
 Extraction : handle the extraction of data from
different distributed data sources
 Transformation :cleansing and customizing the data
for the business needs and rules while transforming
the data to match the data warehouse schema
 Loading : Load the data into data warehouse
Data
Sources
Extract
Load
ETL
Data
Warehouse
Transform
3
© International Business Machines Corporation 2008
IBM Research - Tokyo
Data Explosion in ETL
 Data Explosion
– The amount of data stored in a
typical contemporary data
warehouse may double every 12
to 18 months
 Data Source Examples:
– Logs for Regulatory Compliance (e.g. SOX)
– POS (Point-of-sale) Transaction of Retail
Store
(e.g. Wal-Mart)
– Web Data (e.g. internet auction sites, EBay)
– CDR (Call Detail Record) for Telecom
companies to analyze customer’s behavior
Trading Data
4
© International Business Machines Corporation 2008
IBM Research - Tokyo
Near-Real Time ETL
 Given the data explosion problem, there are
strong needs for ETL processing to be as fast as
possible so that business analysts can quickly
grasp the trends of customer activities
5
© International Business Machines Corporation 2008
IBM Research - Tokyo
Our Motivation:
 Assess the applicability of System S, data stream
processing system to the ETL processing for,
considering both qualitative and quantitative ETL
constraints
 Thoroughly evaluate the performance of System
S as a scalable and distributed ETL platform to
achieve “Near-Real Time ETL” and solve the data
explosion in the ETL domain
6
© International Business Machines Corporation 2008
IBM Research - Tokyo
Outline
 Background and Motivation
 System S and its suitability for ETL
 Performance Evaluation of System S as a
Distributed ETL Platform
 Performance Optimization
 Related Work and Conclusions
7
© International Business Machines Corporation 2008
IBM Research - Tokyo
Stream Computing and System S
 System S: Stream Computing Middleware developed by
IBM Research
 System S is productized as “InfoSphere Streams” now.
Traditional Computing
Fact finding with data-at-rest
8
Stream Computing
Insights from data in motion
© International Business Machines Corporation 2008
IBM Research - Tokyo
InfoSphere Streams Programming Model
Source Adapters
Operator Repository
Sink Adapters
Application Programming (SPADE)
Platform optimized compilation
9
© International Business Machines Corporation 2008
IBM Research - Tokyo
SPADE : Advantages of Stream Processing as
Parallelization Model
 A stream-centric programming language dedicated for
data stream processing
 Streams as first class entity
– Explicit task and data parallelism
– Intuitive way to exploit multi-core and multi-nodes
 Operator and data source profiling for better resource
management
 Reuse of operators across stored and live data
 Support for user-customized operators (UDOP)
10
© International Business Machines Corporation 2008
IBM Research - Tokyo
A simple SPADE example
[Application]
SourceSink trace
Aggregate Functor
Source
Sink
[Nodepool]
Nodepool np := (“host1”, “host2”, “host3)
[Program]
// virtual schema declaration
vstream Sensor (id : id_t, location : Double, light : Float, temperature : Float, timestamp : timestamp_t)
// a source stream is generated by a Source operator – in this case tuples come from an input file
stream SenSource ( schemaof(Sensor) )
:= Source( ) [ “file:///SenSource.dat” ] {}
-> node(np, 0)
// this intermediate stream is produced by an Aggregate operator, using the SenSource stream as input
stream SenAggregator ( schemaof(Sensor) )
:= Aggregate( SenSource <count(100),count(1)> ) [ id . location ]
{ Any(id), Any(location), Max(light), Min(temperature), Avg(timestamp) }
-> node(np, 1)
// this intermediate stream is produced by a functor operator
stream SenFunctor ( id: Integer, location: Double, message: String )
:= Functor( SenAggregator ) [ log(temperature,2.0)>6.0 ]
{ id, location, “Node ”+toString(id)+ “ at location ”+toString(location) }
-> node(np, 2)
// result management is done by a sink operator – in this case produced tuples are sent to a socket
Null := Sink( SenFunctor ) [ “udp://192.168.0.144:5500/” ] {}
-> node(np, 0)
11
© International Business Machines Corporation 2008
IBM Research - Tokyo
InfoSphere Streams Runtime
Optimizing scheduler assigns operators
to processing nodes, and continually
manages resource allocation
Processing
Element
Container
Processing
Element
Container
Processing
Element
Container
Processing
Element
Container
Processing
Element
Container
Streams Data Fabric
Transport
X86
X86
Blade
Box
12
X86
X86
Blade
Blade
Template Documentation
X86
FPGA
Blade
Blade
X86
X86
Blade
Blade
X86
Cell
Blade
Blade
© International Business Machines Corporation 2008
IBM Research - Tokyo
System S as a Distributed ETL Platform ?
Can we use System S as a distributed ETL
processing platform ?
?
13
© International Business Machines Corporation 2008
IBM Research - Tokyo
Outline
 Background and Motivation
 System S and its suitability for ETL
 Performance Evaluation of System S as a
Distributed ETL Platform
 Performance Optimization
 Related Work and Conclusions
14
© International Business Machines Corporation 2008
IBM Research - Tokyo
Target Application for Evaluation
Inventory processing for multiple warehouses that includes most
of the representative ETL primitives (Sort,Join,and Aggregate)h
15
© International Business Machines Corporation 2008
IBM Research - Tokyo
SPADE Program for Distributed Processing
Warehouse
Items 1
(Warehouse_20090901_1.txt)
Warehouse
Items 2
Data Distribution
Host
Source
Sort
6 million
bundle
Source
Split
(Warehouse_20090901_2.txt)
Warehouse
Items 3
Compute host (1)
0100-0300-00
0100-0900-00
Join
Sort
Join
UDOP
Aggregate
Source
Key=item
(Warehouse_20090901_3.txt)
Functor
(SplitDuplicated
Tuples)
Sink
ODBC
Append
Sort
Sink
Compute host (2)
Functor
Sort
Join
Sort
Join
Sort
ODBC
Append
Around 60
Sink
UDOP
Aggregate
Sink
(SplitDuplicated
Tuples)
Functor
Sink
Compute host (N)
Source
Item
Catalog
Sort
Sort
Join
Sort
Sort
ODBC
Append
Functor
UDOP
Aggregate
Sink
16
Join
(SplitDuplicated
Tuples)
Functor
Sink
Sink
© International Business Machines Corporation 2008
IBM Research - Tokyo
SPADE Program (1/2)
[Nodepools]
nodepool np[] := ("s72x336-00", "s72x336-02",
"s72x336-03", "s72x336-04")
[Program]
vstream Warehouse1Schema(id: Integer, item : String, Onhand : String,
allocated : String, hardAllocated : String,
fileNameColumn : String)
vstream Warehouse2OutputSchema(id: Integer, item : String,
Onhand : String,
allocated : String, hardAllocated : String,
fileNameColumn : String, description: StringList)
vstream ItemSchema(item: String, description: StringList)
##===================================================
## warehouse 1
##===================================================
bundle warehouse1Bundle := ()
for_begin @i 1 to 3
stream Warehouse1Stream@i(schemaFor(Warehouse1Schema))
:= Source()["file:///SOURCEFILE", nodelays, csvformat]{}
-> node(np, 0), partition["Sources"]
warehouse1Bundle += Warehouse1Stream@i
for_end
17
## stream for computing subindex
stream StreamWithSubindex(schemaFor(Warehouse1Schema), subIndex: Integer)
:= Functor(warehouse1Bundle[:])[] {
subIndex := (toInteger(strSubstring(item, 6,2)) / (60 / COMPUTE_NODE_NUM))-2 }
-> node(np, 0), partition["Sources"]
for_begin @i 1 to COMPUTE_NODE_NUM
stream ItemStream@i(schemaFor(Warehouse1Schema), subIndex:Integer)
for_end
:= Split(StreamWithSubindex) [ subIndex ]{}
-> node(np, 0), partition["Sources"]
for_begin @i 1 to COMPUTE_NODE_NUM
stream Warehouse1Sort@i(schemaFor(Warehouse1Schema))
:= Sort(ItemStream@i <count(SOURCE_COUNT@i)>)[item, asc]{}
-> node(np, @i-1), partition["CMP%@i"]
stream Warehouse1Filter@i(schemaFor(Warehouse1Schema))
:= Functor(Warehouse1Sort@i)[ Onhand="0001.000000" ] {}
-> node(np, @i-1), partition["CMP%@i"]
Nil := Sink(Warehouse1Filter@i)["file:///WAREHOUSE1_OUTPUTFILE@i",
csvFormat, noDelays]{}
-> node(np, @i-1), partition["CMP%@i"]
for_end
© International Business Machines Corporation 2008
IBM Research - Tokyo
SPADE Program (2/2)
##====================================================
## warehouse 2
##====================================================
stream ItemsSource(schemaFor(ItemSchema))
:= Source()["file:///ITEMS_FILE", nodelays, csvformat]{}
-> node(np, 1), partition["ITEMCATALOG"]
stream SortedItems(schemaFor(ItemSchema))
:= Sort(ItemsSource <count(ITEM_COUNT)>)[item, asc]{}
-> node(np, 1), partition["ITEMCATALOG"]
for_begin @i 1 to COMPUTE_NODE_NUM
stream JoinedItem@i(schemaFor(Warehouse2OutputSchema))
:= Join(Warehouse1Sort@i <count(SOURCE_COUNT@i)>;
SortedItems <count(ITEM_COUNT)>)
[ LeftOuterJoin, {item} = {item} ]{}
-> node(np, @i-1), partition["CMP%@i"]
##=================================================
## warehouse 3
##=================================================
for_begin @i 1 to COMPUTE_NODE_NUM
stream SortedItems@i(schemaFor(Warehouse2OutputSchema))
:= Sort(JoinedItem@i <count(JOIN_COUNT@i)>)[id, asc]{}
-> node(np, @i-1), partition["CMP%@i"]
stream AggregatedItems@i(schemaFor(Warehouse2OutputSchema),
count: Integer)
:= Aggregate(SortedItems@i <count(JOIN_COUNT@i)>)
[item . id]
{ Any(id), Any(item), Any(Onhand), Any(allocated),
Any(hardAllocated), Any(fileNameColumn), Any(description), Cnt() }
-> node(np, @i-1), partition["CMP%@i"]
18
stream JoinedItem2@i(schemaFor(Warehouse2OutputSchema), count: Integer)
:= Join(SortedItems@i <count(JOIN_COUNT@i)>;
AggregatedItems@i <count(AGGREGATED_ITEM@i)>)
[ LeftOuterJoin, {id, item} = {id, item} ] {}
-> node(np, @i-1), partition["CMP%@i"]
stream SortJoinedItem@i(schemaFor(Warehouse2OutputSchema), count: Integer)
:= Sort(JoinedItem2@i <count(JOIN_COUNT@i)>)[id(asc).fileNameColumn(asc)]{}
-> node(np, @i-1), partition["CMP%@i"]
stream DuplicatedItems@i(schemaFor(Warehouse2OutputSchema), count: Integer)
stream UniqueItems@i(schemaFor(Warehouse2OutputSchema), count: Integer)
:= Udop(SortJoinedItem@i)["FilterDuplicatedItems"]{}
-> node(np, @i-1), partition["CMP%@i"]
Nil := Sink(DuplicatedItems@i)["file:///DUPLICATED_FILE@i", csvFormat, noDelays]{}
-> node(np, @i-1), partition["CMP%@i"]
stream FilterStream@i(item: String, recorded_indicator: Integer)
:= Functor(UniqueItems@i)[] { item, 1 }
-> node(np, @i-1), partition["CMP@i"]
stream AggregatedItems2@i(LoadNum: Integer, Item_Load_Count: Integer)
:= Aggregate(FilterStream@i <count(UNIQUE_ITEM@i)>)
[ recorded_indicator ]
{ Any(recorded_indicator), Cnt() }
-> node(np, @i-1), partition["CMP@i"]
stream AddTimeStamp@i(LoadNum: Integer, Item_Load_Count: Integer, LoadTimeStamp
:= Functor(AggregatedItems2@i)[] { LoadNum, Item_Load_Count, timeStampMicrosecon
-> node(np, @i-1), partition["CMP@i"]
Nil := Sink(AddTimeStamp@i)["file:///final_result.out", csvFormat, noDelays]{}
-> node(np, @i-1), partition["CMP@i"]
for_end
© International Business Machines Corporation 2008
IBM Research - Tokyo
Qualitative Evaluation of SPADE
 Implementation
– Lines of SPADE: 76 lines
– # of Operators: 19 (1 UDOP Operator)
 Evaluation
– With the built-in operators of SPADE, we could develop
the given ETL scenario in a highly productive manner
– The functionality of System S for running a SPADE
program on distributed nodes was a great help
19
© International Business Machines Corporation 2008
IBM Research - Tokyo
Performance Evaluation
Total Nodes: 14 nodes and 56 CPU cores
Spec. for Each Node : Intel Xeon X5365 3.0 GHz Xeon (4 physical
cores with HT), 16GB memory, RHEL 5.3 64bit (Linux Kernel 2.6.18.164.el5)
Network : Infiniband Network (DDR 20Gbps) Or 1Gbps Network
Software: InfoSphere Streams: beta version
Data : 9 Million Records (1 Record is around 100 Byte)
Item Sorting
Data Distribution
1
2
3
Total = 14 Nodes
(Each node has 4 cores)
4
e0101b0${n}e1
n
5
1 21
11 31
6
2 22
12 32
7
3 23
13
8
4 24
14
9
5 25
15
10
6 26
16
11
7 27
17
12
8 28
18
13
9 29
19
14
10 30
20
Compute Host (10 Nodes, 40 Cores)
20
© International Business Machines Corporation 2008
IBM Research - Tokyo
Node Assignment
Item Sorting
Data Distribution
1
2
3
Total = 14 Nodes
(Each node has 4 cores)
4
e0101b0${n}e1
n
Not used
5
1 21
11 31
6
2 22
12 32
7
3 23
13
8
4 24
14
9
5 25
15
10
6 26
16
11
7 27
17
12
8 28
18
13
9 29
19
14
10 30
20
Compute Host (10 Nodes, 40 Cores)
21
© International Business Machines Corporation 2008
IBM Research - Tokyo
Throughput for Processing 9 Million Data
Maximum Throughput : around 180000 records per second (144 Mbps)
Throughput and Speedup for Processing 9M Data
200000
8
7
160000
140000
6
120000
5
100000
80000
4
60000
3
40000
Speed-up
2
20000
0
1
4
22
Speedups against 4 Cores
Throughput (records/s)
180000
8
12
16
20
24 28
# of Cores
32
36
40
Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores,
© International Business Machines Corporation 2008
16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment: A
IBM Research - Tokyo
Analysis (I-a) : Breakdown the Total Time
Data Distribution is Dominant
Elapsed Time for Processing 9 Million Data
120
Elasped Time (s)
100
80
60
40
20
0
4
Computation
8
12
16
20
28
32
36
40
# of Cores (1 Node has 4 Cores)
Time for Computation
23
24
Time for Data Distribution
Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores,
© International Business Machines Corporation 2008
16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment: A
IBM Research - Tokyo
Analysis (I-b) Speed-up ratio against 4 cores
when focusing on only “computation part”
Speed up ratio
Speed-up ratio against 4 cores
14
12
Over Linear-Scale
10
8
6
4
2
0
4
8
12
16
20
24
28
32
36
40
# of Cores
Speed up of Throughput for Compute Time
24
Linear Speedup
Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores,
© International Business Machines Corporation 2008
16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment: A
IBM Research - Tokyo
CPU Utilization at Compute Hosts
Computation
Idle
Computation
25
© International Business Machines Corporation 2008
IBM Research - Tokyo
Outline
 Background and Motivation
 System S and its suitability for ETL
 Performance Evaluation of System S as a
Distributed ETL Platform
 Performance Optimization
 Related Work and Conclusions
26
© International Business Machines Corporation 2008
IBM Research - Tokyo
Performance Optimization
 The previous experiment shows that most of
the time is spent in the data distribution or I/O
processing
 For performance optimization, we
implemented a SPADE program in such a way
that all the nodes are participated in the data
distribution while each source operator is only
responsible for certain chunk of data records
divided by the number of source operators
27
© International Business Machines Corporation 2008
IBM Research - Tokyo
Performance Optimization
1.
We modified the SPADE data-flow program in such a way that multiple
Source operators participate in the data distribution
2.
Each data distribution node can read a chunk of the whole data
Original SPADE Program
Data
Distribu
6tion
million
Host
bundle
Warehouse
Items 1
Source
(Warehouse_20090901_1.txt)
Warehouse
Items 2
Source
(Warehouse_20090901_2.txt)
Compute host (1)
Sort
y
=
i
t
e
m
Functor
Sink
Join
Sort
Join
ODBC
Append
Sort
UDOP
Functor
(SplitDuplicated
Sink Tuples)
Aggregate
Ke
Data Distribution Host
Compute host (1)
0100-0300-00
0100-0900-00
Split
Warehouse
Items 3
Source
(Warehouse_20090901_3.txt)
Optimized SPADE Program
Join
Sort
Warehouse
Items 1
Join
Sort
Source
Split
Sink
Compute host (2)
Sort
Warehouse
Items 1
Source
Split
ODBC
Append
Ke
Sort Join
y
=
i
t 0100e
030
m
0-00
UDOP
Functor
(SplitDuplicated
Sink Tuples)
Compute host (2)
0100090
0-00
Sort Join
Sort Join Sort
ODBC
Append
UDOP
Aggregate (SplitDuplicated
Functor Sink
Sink Tuples)
Sink
Warehouse
Items 1
ODBC
Append
UDOP
Aggregate (SplitDuplicated
Functor Sink
Sink Tuples)
Around 60
Aggregate
Sort Join Sort
Source
Split
Around 60
Compute host (N)
Source
Item
Catalog
Sort
Sort
Join
Sort
Join
Sort
ODBC
Append
Compute host (N)
SourceSort
Functor
UDOP
Aggregate
Sink
(SplitDuplicatedFunctor
Tuples)
Sink
Sink
Sort
Join
Aggregate
Sink
28
Sort
Join Sort
Item
Catalog
Functor
UDOP
ODBC
Append
Functor
(SplitDuplicated
Tuples)
Sink
Sink
© International Business Machines Corporation 2008
IBM Research - Tokyo
Node Assignment
 All the 14 nodes participate in the data distribution
 Each operator reads the number of records that divide the total data
records (9M records) with the number of source operators.
The node assignment for compute nodes are the same as Experiment I
Data Distribution
1
1
2
15
2
3
16
3
17
Total = 14 Nodes
(Each node has 4 cores)
4
4
18
e0101b0${n}e1
n
disk
5
5
19
disk
disk
disk
6
6 20
7
7 21
disk
disk
disk
8
8 22
disk
9
9
10
23
10 24
disk
disk
11
11
12
12
disk
13
13
disk
14
14
disk
disk
Data Distribution / Compute Host
29
© International Business Machines Corporation 2008
IBM Research - Tokyo
Elapsed time with varying number of compute
nodes and source operators
Elapsed Time (s) for Processing 9M Records
40
35 Elapsed Time (s)
30
25
20
15
10
5
0
20
# of Compute
Nodes
30
24
28
32
45
45
30
30
25
20
18
15
6
9
12
3
# of source operators
Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores,
16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment:C
25
20 18Nodes
15
# of Source
12
9
6
3
© International Business Machines Corporation 2008
IBM Research - Tokyo
Throughput : Over 800000 records / sec
Throughput with varying number of source nodes
900000
Throughput (records/s)
800000
700000
600000
500000
400000
300000
200000
100000
0
3
6
9
12
15
18
20
25
30
45
# of Source Nodes
20
31
24
28
32
© International Business Machines Corporation 2008
Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment:C
IBM Research - Tokyo
Scalability : Achieved Super-Linear with Data
Distribution Optimization
Speedup ratio against 4 cores
Comparison among various optimizations
10
9
8
7
6
5
4
3
2
1
0
4
8
12
16
20
24
28
32
# of compute nodes
1 Source Operator
3 Source Operators
Optimization (9 Source Operators for 20, 24,28, 32)
More Optimization
Linear
32
© International Business Machines Corporation 2008
Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment:C
IBM Research - Tokyo
Outline
 Background and Motivation
 System S and its suitability for ETL
 Performance Evaluation of System S as a
Distributed ETL Platform
 Performance Optimization
 Related Work and Conclusions
33
© International Business Machines Corporation 2008
IBM Research - Tokyo
Related Work
 Near Real-Time ETL
– Panos et.al. reviewed the state of the art of both
conventional and near real-time ETL [2008 Springer]
 ETL Benchmarking
– Wyatt et.al. identifies a common characteristics of ETL
workflows in an effort of proposing a unified evaluation
method for ETL [2009 Springer Lecture Notes]
– TPC-ETL: formed in 2008 and still under the
development by the TPC subcommittee
34
© International Business Machines Corporation 2008
IBM Research - Tokyo
Conclusions and Future Work
 Conclusions
– Demonstrated the software productivity and scalable
performance of System S in the ETL domain
– After the data distribution optimization, we achieved over
linear scalability performance by processing around 800000
records per second on 14 nodes
 Future Work
– Comparison with the existing ETL tools / systems and various
application scenarios (TPC-ETL?)
– Automatic Data Distribution Optimization
35
© International Business Machines Corporation 2008
IBM Research - Tokyo
Future Direction:
Automatic Data Distribution Optimization
 We were able to identify the
appropriate number of source
operators through a series of longrunning experiments.
 However, It is not wise for such a
distributed systems as System S
to force users/developers to
experimentally find the appropriate
number of source nodes.
 We will need to have an automatic
optimization mechanism that
maximizes the throughput by
automatically finding the best
number of source nodes in a
seamless manner from the user.
Node Pool
1 2 3
Source
Operators
P
Compute
Operators
n(S1, C1)
C1
d1
S1
C2
d2
S2
C3
d3
S3
n(Sn, C3)
dn
Sn
Cm
36
© International Business Machines Corporation 2008
IBM Research - Tokyo
Questions
?
Thank You
37
© International Business Machines Corporation 2008
IBM Research - Tokyo
Backup
© International Business Machines Corporation 2008
2016/6/28
IBM Research - Tokyo
Towards Adaptive Optimization
§ The current SPADE compiler has compile-time optimizer by obtaining the statistical data
such as tuple/byte rates and CPU ratio for each operator.
§ We would like to let users/developers to write a SPADE program in a left manner without
considering the data partitioning and data distribution.
§ By extending the current optimizer, the system automatically could convert the left-hand
side program to right-hand program that achieves the maximum data distribution
Source
Operator
Compute
Operators
C1
Source
Operators
d1 S1
C2
D
S
C3
Cm
Original SPADE Program
39
Data
Distribution
Optimizer
d2
d3
S2
dn
Sn
Compute
Operators
C1
C2
C3
S3
Cm
Optimized SPADE Program
© International Business Machines Corporation 2008
IBM Research - Tokyo
Executive Summary
Optimized version vs. others
Elapsed Time for Baseline
Motivation:
§ Evaluate System S as an ETL platform at a
large experimental environment, Watson
cluster
§ Understand the performance characteristics
at such a large testbed such as scalability and
performance bottlenecks
Comparison among various optimizations
Elapsed Time for Processing 9 Million Data
Speedup ratio against 4 cores
120
Elasped Time (s)
100
80
60
40
10
9
Optimized
version
8
7
6
5
4
3
2
1
0
20
4
8
12
4
Findings:
40
8
12
16
20
24
28
32
36
40
20
24
28
32
1 Source Operator
3 Source Operators
Optimization (9 Source Operators for 20, 24,28, 32)
# of Cores (1 Node has 4 Cores)
More Optimization
Linear
Time for Computation Time for Data Distribution
Comparison between 1Gbs network and Infiniband Network
Throughput Comparison w/ varying number of source nodes
Throughput with varying number of source nodes
900000
900000
800000
800000
Throughput
Throughput (records/s)
700000
Throughput (data/s)
§ A series of our experiments have shown that
data distribution cost is dominant in the ETL
processing
§ The optimized version in right hand side
shows that when changing the number of data
feed (or source) operators, the throughput is
dramatically increased and obtains higher
speed-ups than the others
§ Using the Infiniband network is critical for such
an ETL workload that includes barrier before
aggregating all the data for sorting operation,
and we achieved almost double performance
against the one with 1Gbs network
16
# of compute nodes
0
600000
500000
400000
300000
200000
700000
600000
500000
400000
300000
200000
100000
100000
0
0
2
3
4
5
6
9
12
15
18
20
25
30
45
3
6
9
12
20
1Gbps Network
24
28
32
15
18
20
25
30
45
# of Source Nodes
# of Source nodes
20
24
28
32
Infiniband Network
© International Business Machines Corporation 2008
IBM Research - Tokyo
Node Assignment (B) for Experiment II
Experimental Environment is comprised of 3 source nodes for data distribution, 1 node for
item sorting, and 10 nodes for computation. The compute node has 4 cores and we
manually allocate each operator with the following scheduling policy. The following
diagram shows the case in that 32 operators are used for the computation. Each operator
is allocated to adjunct node in order
Data Distribution
Item Sorting
1
2
3
Total = 14 Nodes
(Each node has 4 cores)
4
e0101b0${n}e1
n
5
1 21
11 31
6
2 22
12 32
7
3 23
13
8
4 24
14
9
5 25
15
10
6 26
16
11
7 27
17
12
8 28
18
13
9 29
19
14
10 30
20
Compute Host (10 Nodes, 40 Cores)
41
© International Business Machines Corporation 2008
IBM Research - Tokyo
SPADE Program with Data Distribution
Optimization
Since 3 nodes are participated in the data distribution, the number of
communication is at maximum 120 (3 x 40).
UDOP Sort
Join
c0101b05
(SplitDuplicated Functor
Sort
Join
Sort
Aggregate
Sink
Sort
Join
Aggregate
ODBC
UDOP Sort
Sort Tuples) Join
Append
Sink
(SplitDuplicated Functor
Sink
Tuples)
ODBC
UDOP
Sort
Join
Sort
Append
Sink
(SplitDuplicated Functor
Sink
Tuples)
Sort
Join
Aggregate
c0101b01
Warehouse
Items 2
Source
(Warehouse_20090901_2.txt)
Functor
Split
ODBC
Append
Sink
ODBC
UDOP
Sort Tuples) Join
Sort
Append
Sink
(SplitDuplicated Functor
Sink
Tuples)
ODBC
UDOP Sort
Sort
Join
Append
Sink
(SplitDuplicated Functor
Sink
Tuples)
1
2
Sort
Join
Sort
Aggregate
Sink
Sort
Join
Aggregate
Sort
Join
Aggregate
4
1
2
UDOP Sort
Join
c0101b07
(SplitDuplicated Functor
c0101b02
ODBC
Append
Sink
ODBC
UDOP Sort
Sort Tuples) Join
Append
Sink
(SplitDuplicated Functor
Sink
Tuples)
ODBC
UDOP Sort
Sort
Join
Append
Sink
(SplitDuplicated Functor
Sink
Tuples)
Sort
Join
Sort
Aggregate
Sink
Sort
Join
Aggregate
Split
Sort
Join
Aggregate
Sink
4
1
2
….
ODBC
UDOP Sort
Join
Append
Sink
(SplitDuplicated Functor
ODBC
UDOP Sort
Sort Tuples) Join
Append
Sink
(SplitDuplicated Functor
Sink
Tuples)
ODBC
UDOP
Sort
Join
Sort
Append
Sink
(SplitDuplicated Functor
Sink
Tuples)
Sort
Join
Sort
Aggregate
Sink
Sort
Join
Aggregate
c0101b03
Warehouse
Items 2
Source
(Warehouse_20090901_2.txt)
Functor
Split
Sort
Join
Aggregate
Sink
UDOP Sort
Join
s72x336-14
(SplitDuplicated Functor
Sort
Join
Sort
Aggregate
Sink
Sort
Join
Aggregate
Sort
Join
Aggregate
42
4
UDOP Sort
Join
c0101b06
(SplitDuplicated Functor
Sink
Warehouse
Items 2
Source
(Warehouse_20090901_2.txt)
Functor
1
2
ODBC
Append
Sink
4
1
2
ODBC
Append
Sink
ODBC
UDOP Sort
Sort Tuples) Join
Append
Sink
(SplitDuplicated Functor
Sink
Tuples)
ODBC
UDOP
Sort
Join
Sort
Append
Sink
(SplitDuplicated Functor
Sink
Tuples)
4
© International Business Machines Corporation 2008
IBM Research - Tokyo
New SPADE
Program
Experiment I
for_begin @j 1 to COMPUTE_NODE_NUM
bundle warehouse1Bundle@j := ()
for_end
After
#define SOURCE_NODE_NUM 3
for_begin @i 0 to SOURCE_NODE_NUM-1
stream Warehouse1Stream@i(schemaFor(Warehouse1Schema))
:= Source()["file:///SOURCEFILE", nodelays, csvformat]{}
-> node(SourcePool, @i), partition["Sources@i"]
stream StreamWithSubindex@i(schemaFor(Warehouse1Schema),
subIndex: Integer)
:= Functor(Warehouse1Stream1)[] {
subIndex := (toInteger(strSubstring(item, 6,2)) / (60 /
COMPUTE_NODE_NUM)) }
bundle warehouse1Bundle := ()
for_begin @i 1 to 3
stream Warehouse1Stream@i(schemaFor(Warehouse1Schema))
:= Source()["file:///SOURCEFILE", nodelays, csvformat]{}
-> node(np, 0), partition["Sources"]
warehouse1Bundle += Warehouse1Stream@i
for_end
for_begin @j 1 to
COMPUTE_NODE_NUM
stream
StreamForWarehouse1Sort@j(schem
aFor(Warehouse1Schema))
:=
Functor(warehouse1Bundle@j[:])[]{}
-> node(np, @j-1),
partition["CMP%@j"]
-> node(SourcePool, @i), partition["Sources@i"]
for_begin @j 1 to COMPUTE_NODE_NUM
stream ItemStream@i@j(schemaFor(Warehouse1Schema),
subIndex:Integer)
stream
Warehouse1Sort@j(schemaFor(War
ehouse1Schema))
for_end
:= Split(StreamWithSubindex@i) [ subIndex ]{}
-> node(SourcePool, @i), partition["Sources@i"]
for_begin @j 1 to COMPUTE_NODE_NUM
warehouse1Bundle@j += ItemStream@i@j
for_end
for_end
43
:=
Sort(StreamForWarehouse1Sort@j
<count(SOURCE_COUNT@j)>)[item,
asc]{}
warehouse2, 3, and 4 are omitted
-> node(np,
@j-1),
in this chart, but we executed them
partition["CMP%@j"]
for the experiment
stream
© International Business Machines Corporation 2008
IBM Research - Tokyo
Node Assignment (C) for Experiment III
§ All the 14 nodes participate in the data distribution, and each Source operator is assigned as the
manner described in the following diagram. For instance, 24 Source operators are allocated to each
node in order and when 14 source operators are allocated to 14 nodes, then the next source operator
is allocated to the first node.
§ Each operator reads the number of records that divide the total data records (9M recordss) with the
number of source operators. This data division is conducted in prior using a Linux tool called “split”
Distribution
§ The node Data
assignment
for compute nodes are the same as Experiment I
1
1
2
15
2
3
16
3
17
Total = 14 Nodes
(Each node has 4 cores)
4
4
18
e0101b0${n}e1
n
disk
5
5
19
disk
44
disk
disk
6
6 20
7
7 21
disk
disk
disk
8
8 22
disk
9
9
10
23
10 24
disk
disk
11
11
12
12
disk
13
13
disk
14
14
disk
disk
© International Business Machines Corporation 2008
IBM Research - Tokyo
Performance Result for Experiment II and
Comparison with Experiment I
When 3 nodes are participated in the data distribution, the throughput is increased
to almost double when compared with the result given by Experiment I
450000
160
400000
140
350000
120
300000
100
250000
80
200000
60
150000
100000
40
50000
20
0
0
4
45
Spee-up Ratio against
Non-Optimization (%)
Throughput (Data
Records per sec)
Comparison in Throughput between Non-Optimization and I/O
Optimization
8
12
16
20
# of Cores
Non-Optimization (EXP20091129a)
Speed-up against Non Optimization
24
28
32
I/O Optimization
Hardware Environment: 14 nodes,
Intel Xeon 3.0GHz, 4 Cores,
© International Business Machines Corporation 2008
16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment:B
IBM Research - Tokyo
Analysis (II-a) Optimization by changing
the number of source operators
Node Assignment for
9 Data Distribution Node
Experimental Environment
§ We changed the number of source
operators while not changing the total
volume of data (9M data records), and
measured throughput
§ We only tested 9MDATA-32 (32
operators for computation)
Experimental Results
In this experiment shows that the 9
source nodes obtains the best
throughput.
46
1
1
2
2
3
Total = 14 Nodes
(Each node has 4 cores)
4
3
4
n
disk
disk
disk
5
6
disk
7
5
6
7
disk
disk
disk
8
8
e0101b0${n}e1
9
10
11
12
disk
disk
disk
disk
13
14
9
disk
disk
disk
Best
Performance with varying number of source operators (the
total data records are the same, 9M, and 32 cores are used
for computation)
600000
Throughput (records/s)
Motivation for this experiment
In the previous page, the throughput is
saturated around 16 cores due to the
lack of data feeding ratio against
computation
500000
400000
300000
200000
100000
0
3
5
9
15
# of Source Nodes
Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores,
16GB RAM, RHEL 5.3©64International
bit, Infiniband
Node Assignment:
B
Business
Machines Corporation
2008
IBM Research - Tokyo
Analysis (II-b) : Increased Throughput by
Data Distribution Optimization
 The following graph shows the overall results by taking the same
optimization approach in previous experiment, which increases the
number of source operators.
 3 source operators are used for 4, 8, 12, 16, and 9 source operators are
Increased
used for 20, 24, 28
andThroughput
32. by Data Distribution Optimization
600000
Throughput (data records / sec)
 We achieved 5.84 times speedup against 4 cores at 32 cores
500000
400000
300000
200000
100000
0
4
8
12
16
20
24
28
32
# of cores
47
Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores,
1 Source Operators
3 Source Operators
© International Business Machines Corporation 2008
Optimization (9 Source Operators
forRAM,
20, 24,28,
16GB
RHEL 32)
5.3 64 bit, Infiniband Node Assignment: B
IBM Research - Tokyo
Analysis (II-c) : Increased Throughput by
Data Distribution Optimization
The yellow line shows the best performance since 9 nodes are participated in the data distribution
for 20, 24, 28 and 32 cores.
Speedup against 4 cores
9
Speedup against 4 cores
8
7
6
5
4
3
2
1
0
4
8
12
16
20
24
28
32
# of cores
1 Source Operators
Optimization (9 Source Operators for 20, 24,28, 32)
48
3 Source Operators
Ideal Scalability
Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores,
© International Business Machines Corporation 2008
16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment:B
IBM Research - Tokyo
Experiment (III): Increasing More Source
Operators
 Motivation
– In this experiment, we understand the performance characteristics
by increasing more source operators than previous experiment
(Experiment II).
– We also identify the performance comparison between Infiniband
network and the commodity 1Gbps network
 Experimental Setting
– We increase the number of source operators up to 45 from 3, and
test this configuration against relatively large number of computes
nodes, 20, 24, 28, 32 nodes.
– Node Assignment for Data Distribution and Computation is the
same as previous experiment (Experiment II)
49
© International Business Machines Corporation 2008
IBM Research - Tokyo
Analysis (II-a): Throughput and Elapsed
Time The maximum total throughput, around 640 Mbps, is below the
network bandwidth of both Infiniband and 1Gbps LAN.
Throughput with varying number of source nodes
800000 tuples/sec (1 tuple=100byte) = 640 Mbps
900000
Throughput
Elapsed Time
Throughput (records/s)
800000
700000
600000
500000
400000
300000
200000
100000
0
3
6
9
12
15
18
20
25
30
45
# of Source Nodes
20
50
24
28
32
Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores,
© International Business Machines Corporation 2008
16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment: C
IBM Research - Tokyo
Analysis (III-c) : Performance Without
Infiniband
Throughput
 In this experiment, we measured the
throughput without Infiniband against
varying number of source operators.
 Although the network we used in this
experiment is 1Gbps, this assumes to be an
upper limit for consuming full network
bandwidth while considering the System S
overhead.
 Drastic performance degradation from 15 to
18 can be observed, and we assume that
this is because, 14 source operators are
allocated to 14 nodes and afterwards 2 or
more operators (processes) simultaneously
accesses the 1Gbs network card and the
resource contention is occurred.
Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores,
16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment:C
51
400000
350000
Throughput (data/s)
 This result shows that the throughput is
around 400000 data records per seconds at
maximum, and this accounts for around 360
Mbps.
450000
300000
20
24
28
32
250000
200000
150000
100000
50000
0
2
3
4
5
6
9
12
15
18
20
25
30
45
# of Source nodes
# of source operator
Elapsed Time
Comparison with varying number of source nodes
60
Elapsed Time(s) for Processing 9M records
 Unlike the performance we obtained with
Infiniband, the throughput is saturated
around 12 – 15.
Throughput Comparison w/ varying number of source nodes
50
40
20
24
28
32
30
20
Elapsed Time
10
0
2
3
4
5
6
9
12
15
# of Source nodes
18
20
25
30
45
Business Machines Corporation 2008
#©
ofInternational
source operator
IBM Research - Tokyo
Analysis (III-d) : Comparison between
W/O Infiniband and W/ Infiniband
This chart shows the performance comparison by enabling or disabling the Infiniband
network. The absolute throughput number when enabling Infiniband is “double”
against w/o Infiniband. This result indicates that using Infiniband in ETL-typed
workloads is essential to obtain high throughput
W/O Infiniband
W/ Infiniband
Throughput Comparison w/ varying number of source nodes
Throughput with varying number of source nodes
900000
900000
800000
800000
Throughput (records/s)
Throughput (data/s)
700000
600000
500000
400000
300000
200000
100000
600000
500000
400000
300000
200000
100000
0
0
2
3
4
5
6
9
12
15
18
# of Sourceoperator
nodes
# of source
20
52
700000
24
28
20
25
30
45
3
6
9
12
15
18
20
25
30
45
Sourceoperator
Nodes
##ofofsource
32
20
24
28
32
Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores,
© International Business Machines Corporation 2008
16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment: C
IBM Research - Tokyo
Analysis (I-c) Elapsed Time for
Distributing 9M Data to Multiple Cores
The following graph demonstrates that the elapsed time for distributing
all the data to varying number of compute cores is nearly constant
Elapsed Time for Distributing 9 Million Data to Multiple Cores
55
50
45
Elapsed Time (s)
40
35
30
25
20
15
10
5
0
4
8
12
16
20
24
28
32
36
40
# of Cores (1 Node has 4 Cores)
53
Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores,
© International Business Machines Corporation 2008
16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment: A