IBM Research - Tokyo Scalable Performance of System S for Extract-Transform-Load Processing Toyotaro Suzumura, Toshihiro Yasue and Tamiya Onodera IBM Research - Tokyo © International Business Machines Corporation 2008 2016/6/28 IBM Research - Tokyo Outline Background and Motivation System S and its suitability for ETL Performance Evaluation of System S as a Distributed ETL Platform Performance Optimization Related Work and Conclusions 2 © International Business Machines Corporation 2008 IBM Research - Tokyo What is ETL ? ETL = Extraction + Transformation + Loading Extraction : handle the extraction of data from different distributed data sources Transformation :cleansing and customizing the data for the business needs and rules while transforming the data to match the data warehouse schema Loading : Load the data into data warehouse Data Sources Extract Load ETL Data Warehouse Transform 3 © International Business Machines Corporation 2008 IBM Research - Tokyo Data Explosion in ETL Data Explosion – The amount of data stored in a typical contemporary data warehouse may double every 12 to 18 months Data Source Examples: – Logs for Regulatory Compliance (e.g. SOX) – POS (Point-of-sale) Transaction of Retail Store (e.g. Wal-Mart) – Web Data (e.g. internet auction sites, EBay) – CDR (Call Detail Record) for Telecom companies to analyze customer’s behavior Trading Data 4 © International Business Machines Corporation 2008 IBM Research - Tokyo Near-Real Time ETL Given the data explosion problem, there are strong needs for ETL processing to be as fast as possible so that business analysts can quickly grasp the trends of customer activities 5 © International Business Machines Corporation 2008 IBM Research - Tokyo Our Motivation: Assess the applicability of System S, data stream processing system to the ETL processing for, considering both qualitative and quantitative ETL constraints Thoroughly evaluate the performance of System S as a scalable and distributed ETL platform to achieve “Near-Real Time ETL” and solve the data explosion in the ETL domain 6 © International Business Machines Corporation 2008 IBM Research - Tokyo Outline Background and Motivation System S and its suitability for ETL Performance Evaluation of System S as a Distributed ETL Platform Performance Optimization Related Work and Conclusions 7 © International Business Machines Corporation 2008 IBM Research - Tokyo Stream Computing and System S System S: Stream Computing Middleware developed by IBM Research System S is productized as “InfoSphere Streams” now. Traditional Computing Fact finding with data-at-rest 8 Stream Computing Insights from data in motion © International Business Machines Corporation 2008 IBM Research - Tokyo InfoSphere Streams Programming Model Source Adapters Operator Repository Sink Adapters Application Programming (SPADE) Platform optimized compilation 9 © International Business Machines Corporation 2008 IBM Research - Tokyo SPADE : Advantages of Stream Processing as Parallelization Model A stream-centric programming language dedicated for data stream processing Streams as first class entity – Explicit task and data parallelism – Intuitive way to exploit multi-core and multi-nodes Operator and data source profiling for better resource management Reuse of operators across stored and live data Support for user-customized operators (UDOP) 10 © International Business Machines Corporation 2008 IBM Research - Tokyo A simple SPADE example [Application] SourceSink trace Aggregate Functor Source Sink [Nodepool] Nodepool np := (“host1”, “host2”, “host3) [Program] // virtual schema declaration vstream Sensor (id : id_t, location : Double, light : Float, temperature : Float, timestamp : timestamp_t) // a source stream is generated by a Source operator – in this case tuples come from an input file stream SenSource ( schemaof(Sensor) ) := Source( ) [ “file:///SenSource.dat” ] {} -> node(np, 0) // this intermediate stream is produced by an Aggregate operator, using the SenSource stream as input stream SenAggregator ( schemaof(Sensor) ) := Aggregate( SenSource <count(100),count(1)> ) [ id . location ] { Any(id), Any(location), Max(light), Min(temperature), Avg(timestamp) } -> node(np, 1) // this intermediate stream is produced by a functor operator stream SenFunctor ( id: Integer, location: Double, message: String ) := Functor( SenAggregator ) [ log(temperature,2.0)>6.0 ] { id, location, “Node ”+toString(id)+ “ at location ”+toString(location) } -> node(np, 2) // result management is done by a sink operator – in this case produced tuples are sent to a socket Null := Sink( SenFunctor ) [ “udp://192.168.0.144:5500/” ] {} -> node(np, 0) 11 © International Business Machines Corporation 2008 IBM Research - Tokyo InfoSphere Streams Runtime Optimizing scheduler assigns operators to processing nodes, and continually manages resource allocation Processing Element Container Processing Element Container Processing Element Container Processing Element Container Processing Element Container Streams Data Fabric Transport X86 X86 Blade Box 12 X86 X86 Blade Blade Template Documentation X86 FPGA Blade Blade X86 X86 Blade Blade X86 Cell Blade Blade © International Business Machines Corporation 2008 IBM Research - Tokyo System S as a Distributed ETL Platform ? Can we use System S as a distributed ETL processing platform ? ? 13 © International Business Machines Corporation 2008 IBM Research - Tokyo Outline Background and Motivation System S and its suitability for ETL Performance Evaluation of System S as a Distributed ETL Platform Performance Optimization Related Work and Conclusions 14 © International Business Machines Corporation 2008 IBM Research - Tokyo Target Application for Evaluation Inventory processing for multiple warehouses that includes most of the representative ETL primitives (Sort,Join,and Aggregate)h 15 © International Business Machines Corporation 2008 IBM Research - Tokyo SPADE Program for Distributed Processing Warehouse Items 1 (Warehouse_20090901_1.txt) Warehouse Items 2 Data Distribution Host Source Sort 6 million bundle Source Split (Warehouse_20090901_2.txt) Warehouse Items 3 Compute host (1) 0100-0300-00 0100-0900-00 Join Sort Join UDOP Aggregate Source Key=item (Warehouse_20090901_3.txt) Functor (SplitDuplicated Tuples) Sink ODBC Append Sort Sink Compute host (2) Functor Sort Join Sort Join Sort ODBC Append Around 60 Sink UDOP Aggregate Sink (SplitDuplicated Tuples) Functor Sink Compute host (N) Source Item Catalog Sort Sort Join Sort Sort ODBC Append Functor UDOP Aggregate Sink 16 Join (SplitDuplicated Tuples) Functor Sink Sink © International Business Machines Corporation 2008 IBM Research - Tokyo SPADE Program (1/2) [Nodepools] nodepool np[] := ("s72x336-00", "s72x336-02", "s72x336-03", "s72x336-04") [Program] vstream Warehouse1Schema(id: Integer, item : String, Onhand : String, allocated : String, hardAllocated : String, fileNameColumn : String) vstream Warehouse2OutputSchema(id: Integer, item : String, Onhand : String, allocated : String, hardAllocated : String, fileNameColumn : String, description: StringList) vstream ItemSchema(item: String, description: StringList) ##=================================================== ## warehouse 1 ##=================================================== bundle warehouse1Bundle := () for_begin @i 1 to 3 stream Warehouse1Stream@i(schemaFor(Warehouse1Schema)) := Source()["file:///SOURCEFILE", nodelays, csvformat]{} -> node(np, 0), partition["Sources"] warehouse1Bundle += Warehouse1Stream@i for_end 17 ## stream for computing subindex stream StreamWithSubindex(schemaFor(Warehouse1Schema), subIndex: Integer) := Functor(warehouse1Bundle[:])[] { subIndex := (toInteger(strSubstring(item, 6,2)) / (60 / COMPUTE_NODE_NUM))-2 } -> node(np, 0), partition["Sources"] for_begin @i 1 to COMPUTE_NODE_NUM stream ItemStream@i(schemaFor(Warehouse1Schema), subIndex:Integer) for_end := Split(StreamWithSubindex) [ subIndex ]{} -> node(np, 0), partition["Sources"] for_begin @i 1 to COMPUTE_NODE_NUM stream Warehouse1Sort@i(schemaFor(Warehouse1Schema)) := Sort(ItemStream@i <count(SOURCE_COUNT@i)>)[item, asc]{} -> node(np, @i-1), partition["CMP%@i"] stream Warehouse1Filter@i(schemaFor(Warehouse1Schema)) := Functor(Warehouse1Sort@i)[ Onhand="0001.000000" ] {} -> node(np, @i-1), partition["CMP%@i"] Nil := Sink(Warehouse1Filter@i)["file:///WAREHOUSE1_OUTPUTFILE@i", csvFormat, noDelays]{} -> node(np, @i-1), partition["CMP%@i"] for_end © International Business Machines Corporation 2008 IBM Research - Tokyo SPADE Program (2/2) ##==================================================== ## warehouse 2 ##==================================================== stream ItemsSource(schemaFor(ItemSchema)) := Source()["file:///ITEMS_FILE", nodelays, csvformat]{} -> node(np, 1), partition["ITEMCATALOG"] stream SortedItems(schemaFor(ItemSchema)) := Sort(ItemsSource <count(ITEM_COUNT)>)[item, asc]{} -> node(np, 1), partition["ITEMCATALOG"] for_begin @i 1 to COMPUTE_NODE_NUM stream JoinedItem@i(schemaFor(Warehouse2OutputSchema)) := Join(Warehouse1Sort@i <count(SOURCE_COUNT@i)>; SortedItems <count(ITEM_COUNT)>) [ LeftOuterJoin, {item} = {item} ]{} -> node(np, @i-1), partition["CMP%@i"] ##================================================= ## warehouse 3 ##================================================= for_begin @i 1 to COMPUTE_NODE_NUM stream SortedItems@i(schemaFor(Warehouse2OutputSchema)) := Sort(JoinedItem@i <count(JOIN_COUNT@i)>)[id, asc]{} -> node(np, @i-1), partition["CMP%@i"] stream AggregatedItems@i(schemaFor(Warehouse2OutputSchema), count: Integer) := Aggregate(SortedItems@i <count(JOIN_COUNT@i)>) [item . id] { Any(id), Any(item), Any(Onhand), Any(allocated), Any(hardAllocated), Any(fileNameColumn), Any(description), Cnt() } -> node(np, @i-1), partition["CMP%@i"] 18 stream JoinedItem2@i(schemaFor(Warehouse2OutputSchema), count: Integer) := Join(SortedItems@i <count(JOIN_COUNT@i)>; AggregatedItems@i <count(AGGREGATED_ITEM@i)>) [ LeftOuterJoin, {id, item} = {id, item} ] {} -> node(np, @i-1), partition["CMP%@i"] stream SortJoinedItem@i(schemaFor(Warehouse2OutputSchema), count: Integer) := Sort(JoinedItem2@i <count(JOIN_COUNT@i)>)[id(asc).fileNameColumn(asc)]{} -> node(np, @i-1), partition["CMP%@i"] stream DuplicatedItems@i(schemaFor(Warehouse2OutputSchema), count: Integer) stream UniqueItems@i(schemaFor(Warehouse2OutputSchema), count: Integer) := Udop(SortJoinedItem@i)["FilterDuplicatedItems"]{} -> node(np, @i-1), partition["CMP%@i"] Nil := Sink(DuplicatedItems@i)["file:///DUPLICATED_FILE@i", csvFormat, noDelays]{} -> node(np, @i-1), partition["CMP%@i"] stream FilterStream@i(item: String, recorded_indicator: Integer) := Functor(UniqueItems@i)[] { item, 1 } -> node(np, @i-1), partition["CMP@i"] stream AggregatedItems2@i(LoadNum: Integer, Item_Load_Count: Integer) := Aggregate(FilterStream@i <count(UNIQUE_ITEM@i)>) [ recorded_indicator ] { Any(recorded_indicator), Cnt() } -> node(np, @i-1), partition["CMP@i"] stream AddTimeStamp@i(LoadNum: Integer, Item_Load_Count: Integer, LoadTimeStamp := Functor(AggregatedItems2@i)[] { LoadNum, Item_Load_Count, timeStampMicrosecon -> node(np, @i-1), partition["CMP@i"] Nil := Sink(AddTimeStamp@i)["file:///final_result.out", csvFormat, noDelays]{} -> node(np, @i-1), partition["CMP@i"] for_end © International Business Machines Corporation 2008 IBM Research - Tokyo Qualitative Evaluation of SPADE Implementation – Lines of SPADE: 76 lines – # of Operators: 19 (1 UDOP Operator) Evaluation – With the built-in operators of SPADE, we could develop the given ETL scenario in a highly productive manner – The functionality of System S for running a SPADE program on distributed nodes was a great help 19 © International Business Machines Corporation 2008 IBM Research - Tokyo Performance Evaluation Total Nodes: 14 nodes and 56 CPU cores Spec. for Each Node : Intel Xeon X5365 3.0 GHz Xeon (4 physical cores with HT), 16GB memory, RHEL 5.3 64bit (Linux Kernel 2.6.18.164.el5) Network : Infiniband Network (DDR 20Gbps) Or 1Gbps Network Software: InfoSphere Streams: beta version Data : 9 Million Records (1 Record is around 100 Byte) Item Sorting Data Distribution 1 2 3 Total = 14 Nodes (Each node has 4 cores) 4 e0101b0${n}e1 n 5 1 21 11 31 6 2 22 12 32 7 3 23 13 8 4 24 14 9 5 25 15 10 6 26 16 11 7 27 17 12 8 28 18 13 9 29 19 14 10 30 20 Compute Host (10 Nodes, 40 Cores) 20 © International Business Machines Corporation 2008 IBM Research - Tokyo Node Assignment Item Sorting Data Distribution 1 2 3 Total = 14 Nodes (Each node has 4 cores) 4 e0101b0${n}e1 n Not used 5 1 21 11 31 6 2 22 12 32 7 3 23 13 8 4 24 14 9 5 25 15 10 6 26 16 11 7 27 17 12 8 28 18 13 9 29 19 14 10 30 20 Compute Host (10 Nodes, 40 Cores) 21 © International Business Machines Corporation 2008 IBM Research - Tokyo Throughput for Processing 9 Million Data Maximum Throughput : around 180000 records per second (144 Mbps) Throughput and Speedup for Processing 9M Data 200000 8 7 160000 140000 6 120000 5 100000 80000 4 60000 3 40000 Speed-up 2 20000 0 1 4 22 Speedups against 4 Cores Throughput (records/s) 180000 8 12 16 20 24 28 # of Cores 32 36 40 Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, © International Business Machines Corporation 2008 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment: A IBM Research - Tokyo Analysis (I-a) : Breakdown the Total Time Data Distribution is Dominant Elapsed Time for Processing 9 Million Data 120 Elasped Time (s) 100 80 60 40 20 0 4 Computation 8 12 16 20 28 32 36 40 # of Cores (1 Node has 4 Cores) Time for Computation 23 24 Time for Data Distribution Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, © International Business Machines Corporation 2008 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment: A IBM Research - Tokyo Analysis (I-b) Speed-up ratio against 4 cores when focusing on only “computation part” Speed up ratio Speed-up ratio against 4 cores 14 12 Over Linear-Scale 10 8 6 4 2 0 4 8 12 16 20 24 28 32 36 40 # of Cores Speed up of Throughput for Compute Time 24 Linear Speedup Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, © International Business Machines Corporation 2008 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment: A IBM Research - Tokyo CPU Utilization at Compute Hosts Computation Idle Computation 25 © International Business Machines Corporation 2008 IBM Research - Tokyo Outline Background and Motivation System S and its suitability for ETL Performance Evaluation of System S as a Distributed ETL Platform Performance Optimization Related Work and Conclusions 26 © International Business Machines Corporation 2008 IBM Research - Tokyo Performance Optimization The previous experiment shows that most of the time is spent in the data distribution or I/O processing For performance optimization, we implemented a SPADE program in such a way that all the nodes are participated in the data distribution while each source operator is only responsible for certain chunk of data records divided by the number of source operators 27 © International Business Machines Corporation 2008 IBM Research - Tokyo Performance Optimization 1. We modified the SPADE data-flow program in such a way that multiple Source operators participate in the data distribution 2. Each data distribution node can read a chunk of the whole data Original SPADE Program Data Distribu 6tion million Host bundle Warehouse Items 1 Source (Warehouse_20090901_1.txt) Warehouse Items 2 Source (Warehouse_20090901_2.txt) Compute host (1) Sort y = i t e m Functor Sink Join Sort Join ODBC Append Sort UDOP Functor (SplitDuplicated Sink Tuples) Aggregate Ke Data Distribution Host Compute host (1) 0100-0300-00 0100-0900-00 Split Warehouse Items 3 Source (Warehouse_20090901_3.txt) Optimized SPADE Program Join Sort Warehouse Items 1 Join Sort Source Split Sink Compute host (2) Sort Warehouse Items 1 Source Split ODBC Append Ke Sort Join y = i t 0100e 030 m 0-00 UDOP Functor (SplitDuplicated Sink Tuples) Compute host (2) 0100090 0-00 Sort Join Sort Join Sort ODBC Append UDOP Aggregate (SplitDuplicated Functor Sink Sink Tuples) Sink Warehouse Items 1 ODBC Append UDOP Aggregate (SplitDuplicated Functor Sink Sink Tuples) Around 60 Aggregate Sort Join Sort Source Split Around 60 Compute host (N) Source Item Catalog Sort Sort Join Sort Join Sort ODBC Append Compute host (N) SourceSort Functor UDOP Aggregate Sink (SplitDuplicatedFunctor Tuples) Sink Sink Sort Join Aggregate Sink 28 Sort Join Sort Item Catalog Functor UDOP ODBC Append Functor (SplitDuplicated Tuples) Sink Sink © International Business Machines Corporation 2008 IBM Research - Tokyo Node Assignment All the 14 nodes participate in the data distribution Each operator reads the number of records that divide the total data records (9M records) with the number of source operators. The node assignment for compute nodes are the same as Experiment I Data Distribution 1 1 2 15 2 3 16 3 17 Total = 14 Nodes (Each node has 4 cores) 4 4 18 e0101b0${n}e1 n disk 5 5 19 disk disk disk 6 6 20 7 7 21 disk disk disk 8 8 22 disk 9 9 10 23 10 24 disk disk 11 11 12 12 disk 13 13 disk 14 14 disk disk Data Distribution / Compute Host 29 © International Business Machines Corporation 2008 IBM Research - Tokyo Elapsed time with varying number of compute nodes and source operators Elapsed Time (s) for Processing 9M Records 40 35 Elapsed Time (s) 30 25 20 15 10 5 0 20 # of Compute Nodes 30 24 28 32 45 45 30 30 25 20 18 15 6 9 12 3 # of source operators Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment:C 25 20 18Nodes 15 # of Source 12 9 6 3 © International Business Machines Corporation 2008 IBM Research - Tokyo Throughput : Over 800000 records / sec Throughput with varying number of source nodes 900000 Throughput (records/s) 800000 700000 600000 500000 400000 300000 200000 100000 0 3 6 9 12 15 18 20 25 30 45 # of Source Nodes 20 31 24 28 32 © International Business Machines Corporation 2008 Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment:C IBM Research - Tokyo Scalability : Achieved Super-Linear with Data Distribution Optimization Speedup ratio against 4 cores Comparison among various optimizations 10 9 8 7 6 5 4 3 2 1 0 4 8 12 16 20 24 28 32 # of compute nodes 1 Source Operator 3 Source Operators Optimization (9 Source Operators for 20, 24,28, 32) More Optimization Linear 32 © International Business Machines Corporation 2008 Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment:C IBM Research - Tokyo Outline Background and Motivation System S and its suitability for ETL Performance Evaluation of System S as a Distributed ETL Platform Performance Optimization Related Work and Conclusions 33 © International Business Machines Corporation 2008 IBM Research - Tokyo Related Work Near Real-Time ETL – Panos et.al. reviewed the state of the art of both conventional and near real-time ETL [2008 Springer] ETL Benchmarking – Wyatt et.al. identifies a common characteristics of ETL workflows in an effort of proposing a unified evaluation method for ETL [2009 Springer Lecture Notes] – TPC-ETL: formed in 2008 and still under the development by the TPC subcommittee 34 © International Business Machines Corporation 2008 IBM Research - Tokyo Conclusions and Future Work Conclusions – Demonstrated the software productivity and scalable performance of System S in the ETL domain – After the data distribution optimization, we achieved over linear scalability performance by processing around 800000 records per second on 14 nodes Future Work – Comparison with the existing ETL tools / systems and various application scenarios (TPC-ETL?) – Automatic Data Distribution Optimization 35 © International Business Machines Corporation 2008 IBM Research - Tokyo Future Direction: Automatic Data Distribution Optimization We were able to identify the appropriate number of source operators through a series of longrunning experiments. However, It is not wise for such a distributed systems as System S to force users/developers to experimentally find the appropriate number of source nodes. We will need to have an automatic optimization mechanism that maximizes the throughput by automatically finding the best number of source nodes in a seamless manner from the user. Node Pool 1 2 3 Source Operators P Compute Operators n(S1, C1) C1 d1 S1 C2 d2 S2 C3 d3 S3 n(Sn, C3) dn Sn Cm 36 © International Business Machines Corporation 2008 IBM Research - Tokyo Questions ? Thank You 37 © International Business Machines Corporation 2008 IBM Research - Tokyo Backup © International Business Machines Corporation 2008 2016/6/28 IBM Research - Tokyo Towards Adaptive Optimization § The current SPADE compiler has compile-time optimizer by obtaining the statistical data such as tuple/byte rates and CPU ratio for each operator. § We would like to let users/developers to write a SPADE program in a left manner without considering the data partitioning and data distribution. § By extending the current optimizer, the system automatically could convert the left-hand side program to right-hand program that achieves the maximum data distribution Source Operator Compute Operators C1 Source Operators d1 S1 C2 D S C3 Cm Original SPADE Program 39 Data Distribution Optimizer d2 d3 S2 dn Sn Compute Operators C1 C2 C3 S3 Cm Optimized SPADE Program © International Business Machines Corporation 2008 IBM Research - Tokyo Executive Summary Optimized version vs. others Elapsed Time for Baseline Motivation: § Evaluate System S as an ETL platform at a large experimental environment, Watson cluster § Understand the performance characteristics at such a large testbed such as scalability and performance bottlenecks Comparison among various optimizations Elapsed Time for Processing 9 Million Data Speedup ratio against 4 cores 120 Elasped Time (s) 100 80 60 40 10 9 Optimized version 8 7 6 5 4 3 2 1 0 20 4 8 12 4 Findings: 40 8 12 16 20 24 28 32 36 40 20 24 28 32 1 Source Operator 3 Source Operators Optimization (9 Source Operators for 20, 24,28, 32) # of Cores (1 Node has 4 Cores) More Optimization Linear Time for Computation Time for Data Distribution Comparison between 1Gbs network and Infiniband Network Throughput Comparison w/ varying number of source nodes Throughput with varying number of source nodes 900000 900000 800000 800000 Throughput Throughput (records/s) 700000 Throughput (data/s) § A series of our experiments have shown that data distribution cost is dominant in the ETL processing § The optimized version in right hand side shows that when changing the number of data feed (or source) operators, the throughput is dramatically increased and obtains higher speed-ups than the others § Using the Infiniband network is critical for such an ETL workload that includes barrier before aggregating all the data for sorting operation, and we achieved almost double performance against the one with 1Gbs network 16 # of compute nodes 0 600000 500000 400000 300000 200000 700000 600000 500000 400000 300000 200000 100000 100000 0 0 2 3 4 5 6 9 12 15 18 20 25 30 45 3 6 9 12 20 1Gbps Network 24 28 32 15 18 20 25 30 45 # of Source Nodes # of Source nodes 20 24 28 32 Infiniband Network © International Business Machines Corporation 2008 IBM Research - Tokyo Node Assignment (B) for Experiment II Experimental Environment is comprised of 3 source nodes for data distribution, 1 node for item sorting, and 10 nodes for computation. The compute node has 4 cores and we manually allocate each operator with the following scheduling policy. The following diagram shows the case in that 32 operators are used for the computation. Each operator is allocated to adjunct node in order Data Distribution Item Sorting 1 2 3 Total = 14 Nodes (Each node has 4 cores) 4 e0101b0${n}e1 n 5 1 21 11 31 6 2 22 12 32 7 3 23 13 8 4 24 14 9 5 25 15 10 6 26 16 11 7 27 17 12 8 28 18 13 9 29 19 14 10 30 20 Compute Host (10 Nodes, 40 Cores) 41 © International Business Machines Corporation 2008 IBM Research - Tokyo SPADE Program with Data Distribution Optimization Since 3 nodes are participated in the data distribution, the number of communication is at maximum 120 (3 x 40). UDOP Sort Join c0101b05 (SplitDuplicated Functor Sort Join Sort Aggregate Sink Sort Join Aggregate ODBC UDOP Sort Sort Tuples) Join Append Sink (SplitDuplicated Functor Sink Tuples) ODBC UDOP Sort Join Sort Append Sink (SplitDuplicated Functor Sink Tuples) Sort Join Aggregate c0101b01 Warehouse Items 2 Source (Warehouse_20090901_2.txt) Functor Split ODBC Append Sink ODBC UDOP Sort Tuples) Join Sort Append Sink (SplitDuplicated Functor Sink Tuples) ODBC UDOP Sort Sort Join Append Sink (SplitDuplicated Functor Sink Tuples) 1 2 Sort Join Sort Aggregate Sink Sort Join Aggregate Sort Join Aggregate 4 1 2 UDOP Sort Join c0101b07 (SplitDuplicated Functor c0101b02 ODBC Append Sink ODBC UDOP Sort Sort Tuples) Join Append Sink (SplitDuplicated Functor Sink Tuples) ODBC UDOP Sort Sort Join Append Sink (SplitDuplicated Functor Sink Tuples) Sort Join Sort Aggregate Sink Sort Join Aggregate Split Sort Join Aggregate Sink 4 1 2 …. ODBC UDOP Sort Join Append Sink (SplitDuplicated Functor ODBC UDOP Sort Sort Tuples) Join Append Sink (SplitDuplicated Functor Sink Tuples) ODBC UDOP Sort Join Sort Append Sink (SplitDuplicated Functor Sink Tuples) Sort Join Sort Aggregate Sink Sort Join Aggregate c0101b03 Warehouse Items 2 Source (Warehouse_20090901_2.txt) Functor Split Sort Join Aggregate Sink UDOP Sort Join s72x336-14 (SplitDuplicated Functor Sort Join Sort Aggregate Sink Sort Join Aggregate Sort Join Aggregate 42 4 UDOP Sort Join c0101b06 (SplitDuplicated Functor Sink Warehouse Items 2 Source (Warehouse_20090901_2.txt) Functor 1 2 ODBC Append Sink 4 1 2 ODBC Append Sink ODBC UDOP Sort Sort Tuples) Join Append Sink (SplitDuplicated Functor Sink Tuples) ODBC UDOP Sort Join Sort Append Sink (SplitDuplicated Functor Sink Tuples) 4 © International Business Machines Corporation 2008 IBM Research - Tokyo New SPADE Program Experiment I for_begin @j 1 to COMPUTE_NODE_NUM bundle warehouse1Bundle@j := () for_end After #define SOURCE_NODE_NUM 3 for_begin @i 0 to SOURCE_NODE_NUM-1 stream Warehouse1Stream@i(schemaFor(Warehouse1Schema)) := Source()["file:///SOURCEFILE", nodelays, csvformat]{} -> node(SourcePool, @i), partition["Sources@i"] stream StreamWithSubindex@i(schemaFor(Warehouse1Schema), subIndex: Integer) := Functor(Warehouse1Stream1)[] { subIndex := (toInteger(strSubstring(item, 6,2)) / (60 / COMPUTE_NODE_NUM)) } bundle warehouse1Bundle := () for_begin @i 1 to 3 stream Warehouse1Stream@i(schemaFor(Warehouse1Schema)) := Source()["file:///SOURCEFILE", nodelays, csvformat]{} -> node(np, 0), partition["Sources"] warehouse1Bundle += Warehouse1Stream@i for_end for_begin @j 1 to COMPUTE_NODE_NUM stream StreamForWarehouse1Sort@j(schem aFor(Warehouse1Schema)) := Functor(warehouse1Bundle@j[:])[]{} -> node(np, @j-1), partition["CMP%@j"] -> node(SourcePool, @i), partition["Sources@i"] for_begin @j 1 to COMPUTE_NODE_NUM stream ItemStream@i@j(schemaFor(Warehouse1Schema), subIndex:Integer) stream Warehouse1Sort@j(schemaFor(War ehouse1Schema)) for_end := Split(StreamWithSubindex@i) [ subIndex ]{} -> node(SourcePool, @i), partition["Sources@i"] for_begin @j 1 to COMPUTE_NODE_NUM warehouse1Bundle@j += ItemStream@i@j for_end for_end 43 := Sort(StreamForWarehouse1Sort@j <count(SOURCE_COUNT@j)>)[item, asc]{} warehouse2, 3, and 4 are omitted -> node(np, @j-1), in this chart, but we executed them partition["CMP%@j"] for the experiment stream © International Business Machines Corporation 2008 IBM Research - Tokyo Node Assignment (C) for Experiment III § All the 14 nodes participate in the data distribution, and each Source operator is assigned as the manner described in the following diagram. For instance, 24 Source operators are allocated to each node in order and when 14 source operators are allocated to 14 nodes, then the next source operator is allocated to the first node. § Each operator reads the number of records that divide the total data records (9M recordss) with the number of source operators. This data division is conducted in prior using a Linux tool called “split” Distribution § The node Data assignment for compute nodes are the same as Experiment I 1 1 2 15 2 3 16 3 17 Total = 14 Nodes (Each node has 4 cores) 4 4 18 e0101b0${n}e1 n disk 5 5 19 disk 44 disk disk 6 6 20 7 7 21 disk disk disk 8 8 22 disk 9 9 10 23 10 24 disk disk 11 11 12 12 disk 13 13 disk 14 14 disk disk © International Business Machines Corporation 2008 IBM Research - Tokyo Performance Result for Experiment II and Comparison with Experiment I When 3 nodes are participated in the data distribution, the throughput is increased to almost double when compared with the result given by Experiment I 450000 160 400000 140 350000 120 300000 100 250000 80 200000 60 150000 100000 40 50000 20 0 0 4 45 Spee-up Ratio against Non-Optimization (%) Throughput (Data Records per sec) Comparison in Throughput between Non-Optimization and I/O Optimization 8 12 16 20 # of Cores Non-Optimization (EXP20091129a) Speed-up against Non Optimization 24 28 32 I/O Optimization Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, © International Business Machines Corporation 2008 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment:B IBM Research - Tokyo Analysis (II-a) Optimization by changing the number of source operators Node Assignment for 9 Data Distribution Node Experimental Environment § We changed the number of source operators while not changing the total volume of data (9M data records), and measured throughput § We only tested 9MDATA-32 (32 operators for computation) Experimental Results In this experiment shows that the 9 source nodes obtains the best throughput. 46 1 1 2 2 3 Total = 14 Nodes (Each node has 4 cores) 4 3 4 n disk disk disk 5 6 disk 7 5 6 7 disk disk disk 8 8 e0101b0${n}e1 9 10 11 12 disk disk disk disk 13 14 9 disk disk disk Best Performance with varying number of source operators (the total data records are the same, 9M, and 32 cores are used for computation) 600000 Throughput (records/s) Motivation for this experiment In the previous page, the throughput is saturated around 16 cores due to the lack of data feeding ratio against computation 500000 400000 300000 200000 100000 0 3 5 9 15 # of Source Nodes Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, 16GB RAM, RHEL 5.3©64International bit, Infiniband Node Assignment: B Business Machines Corporation 2008 IBM Research - Tokyo Analysis (II-b) : Increased Throughput by Data Distribution Optimization The following graph shows the overall results by taking the same optimization approach in previous experiment, which increases the number of source operators. 3 source operators are used for 4, 8, 12, 16, and 9 source operators are Increased used for 20, 24, 28 andThroughput 32. by Data Distribution Optimization 600000 Throughput (data records / sec) We achieved 5.84 times speedup against 4 cores at 32 cores 500000 400000 300000 200000 100000 0 4 8 12 16 20 24 28 32 # of cores 47 Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, 1 Source Operators 3 Source Operators © International Business Machines Corporation 2008 Optimization (9 Source Operators forRAM, 20, 24,28, 16GB RHEL 32) 5.3 64 bit, Infiniband Node Assignment: B IBM Research - Tokyo Analysis (II-c) : Increased Throughput by Data Distribution Optimization The yellow line shows the best performance since 9 nodes are participated in the data distribution for 20, 24, 28 and 32 cores. Speedup against 4 cores 9 Speedup against 4 cores 8 7 6 5 4 3 2 1 0 4 8 12 16 20 24 28 32 # of cores 1 Source Operators Optimization (9 Source Operators for 20, 24,28, 32) 48 3 Source Operators Ideal Scalability Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, © International Business Machines Corporation 2008 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment:B IBM Research - Tokyo Experiment (III): Increasing More Source Operators Motivation – In this experiment, we understand the performance characteristics by increasing more source operators than previous experiment (Experiment II). – We also identify the performance comparison between Infiniband network and the commodity 1Gbps network Experimental Setting – We increase the number of source operators up to 45 from 3, and test this configuration against relatively large number of computes nodes, 20, 24, 28, 32 nodes. – Node Assignment for Data Distribution and Computation is the same as previous experiment (Experiment II) 49 © International Business Machines Corporation 2008 IBM Research - Tokyo Analysis (II-a): Throughput and Elapsed Time The maximum total throughput, around 640 Mbps, is below the network bandwidth of both Infiniband and 1Gbps LAN. Throughput with varying number of source nodes 800000 tuples/sec (1 tuple=100byte) = 640 Mbps 900000 Throughput Elapsed Time Throughput (records/s) 800000 700000 600000 500000 400000 300000 200000 100000 0 3 6 9 12 15 18 20 25 30 45 # of Source Nodes 20 50 24 28 32 Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, © International Business Machines Corporation 2008 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment: C IBM Research - Tokyo Analysis (III-c) : Performance Without Infiniband Throughput In this experiment, we measured the throughput without Infiniband against varying number of source operators. Although the network we used in this experiment is 1Gbps, this assumes to be an upper limit for consuming full network bandwidth while considering the System S overhead. Drastic performance degradation from 15 to 18 can be observed, and we assume that this is because, 14 source operators are allocated to 14 nodes and afterwards 2 or more operators (processes) simultaneously accesses the 1Gbs network card and the resource contention is occurred. Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment:C 51 400000 350000 Throughput (data/s) This result shows that the throughput is around 400000 data records per seconds at maximum, and this accounts for around 360 Mbps. 450000 300000 20 24 28 32 250000 200000 150000 100000 50000 0 2 3 4 5 6 9 12 15 18 20 25 30 45 # of Source nodes # of source operator Elapsed Time Comparison with varying number of source nodes 60 Elapsed Time(s) for Processing 9M records Unlike the performance we obtained with Infiniband, the throughput is saturated around 12 – 15. Throughput Comparison w/ varying number of source nodes 50 40 20 24 28 32 30 20 Elapsed Time 10 0 2 3 4 5 6 9 12 15 # of Source nodes 18 20 25 30 45 Business Machines Corporation 2008 #© ofInternational source operator IBM Research - Tokyo Analysis (III-d) : Comparison between W/O Infiniband and W/ Infiniband This chart shows the performance comparison by enabling or disabling the Infiniband network. The absolute throughput number when enabling Infiniband is “double” against w/o Infiniband. This result indicates that using Infiniband in ETL-typed workloads is essential to obtain high throughput W/O Infiniband W/ Infiniband Throughput Comparison w/ varying number of source nodes Throughput with varying number of source nodes 900000 900000 800000 800000 Throughput (records/s) Throughput (data/s) 700000 600000 500000 400000 300000 200000 100000 600000 500000 400000 300000 200000 100000 0 0 2 3 4 5 6 9 12 15 18 # of Sourceoperator nodes # of source 20 52 700000 24 28 20 25 30 45 3 6 9 12 15 18 20 25 30 45 Sourceoperator Nodes ##ofofsource 32 20 24 28 32 Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, © International Business Machines Corporation 2008 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment: C IBM Research - Tokyo Analysis (I-c) Elapsed Time for Distributing 9M Data to Multiple Cores The following graph demonstrates that the elapsed time for distributing all the data to varying number of compute cores is nearly constant Elapsed Time for Distributing 9 Million Data to Multiple Cores 55 50 45 Elapsed Time (s) 40 35 30 25 20 15 10 5 0 4 8 12 16 20 24 28 32 36 40 # of Cores (1 Node has 4 Cores) 53 Hardware Environment: 14 nodes, Intel Xeon 3.0GHz, 4 Cores, © International Business Machines Corporation 2008 16GB RAM, RHEL 5.3 64 bit, Infiniband Node Assignment: A