116x Filetype PDF File size 1.54 MB Source: richard-ma.netlify.app
Trisk: Task-Centric Data Stream Reconfiguration Yancan Mao YuanHuang Runxin Tian National University of Singapore National University of Singapore National University of Singapore maoyancan@u.nus.edu dcsyhg@nus.edu.sg tianrunxin@u.nus.edu Xin Wang Richard T. B. Ma National University of Singapore National University of Singapore dcswan@nus.edu.sg tbma@comp.nus.edu.sg ABSTRACT the form of stream processing, where the arrived data Due to the long-run and unpredictable nature of stream is processed immediately with low latency and high processing,anystaticallyconfiguredexecutionofstreamjobs throughput. Today, many distributed stream systems, e.g., fails to process data in a timely and efficient manner. To Samza [32], Flink [9], Heron [24], Storm [41] and Spark achieve performance requirements, stream jobs need to be Streaming [44], have been developed to parallelize, deploy reconfigured dynamically. In this paper, we present Trisk, a andmanagestreamjobsforusers. control plane that support versatile reconfigurations while As data stream is by nature fluctuating with dynamic keeping high efficiency with easy-to-use programming APIs. rates and distribution over time, to satisfy low latency Trisk enables versatile reconfigurations with usability based requirements, stream jobs must process data timely [6, 8]. on a task-centric abstraction, and encapsulates primitive This requires stream systems to be able to reconfigure part operations such that reconfigurations can be described by of the dataflow computation dynamically during execution compositing the primitive operations on the abstraction. without affecting the correctness of processing logic. We Triskadoptsapartialpause-and-resumedesignforefficiency, define such actions as reconfigurations on stream jobs. In through which synchronization mechanisms in the native practice, reconfigurations are often applied by a control streamsystemscanfurtherbeleveraged.WeimplementTrisk policy to achieve certain performance goals. Based on onApacheFlinkanddemonstrateitsusageandperformance prior literature [10], we summarize that a good stream under realistic application scenarios. We show that Trisk system should enable reconfigurations with three desirable executes reconfigurations with shorter completion time and properties: versatility, efficiency, and usability. comparable latency compared to a state-of-the-art fluid Versatility. A stream system should support a wide mechanismforstate management. variety of reconfigurations, such that various control polices ACMReferenceFormat: that require different types of reconfigurations can be Yancan Mao, Yuan Huang, Runxin Tian, Xin Wang, and Richard implemented. Common reconfigurations mainly include T. B. Ma. 2021. Trisk: Task-Centric Data Stream Reconfiguration. operations along three dimensions, i.e., resources, workloads, In ACM Symposium on Cloud Computing (SoCC ’21), November and execution logic. The resources and workloads often need 1Å›4, 2021, Seattle, WA, USA. ACM, New York, NY, USA, 15 pages. to be re-assigned to handle data skewness and changes of https://doi.org/10.1145/3472883.3487010 input rates, while the execution logic needs to be updated to fix bugs and handle emerging events [7, 9]. 1 INTRODUCTION Efficiency. Reconfigurations should be executed and With the development of Internet-scale services, data is completed in short time, having minimum impact on the generated in high volume, velocity and variety. Applications original stream job execution. Stream jobs are physically with time constraints are increasingly implemented in executed by a set of parallel tasks, to guarantee the correctness of job execution during reconfigurations, Permission to make digital or hard copies of part or all of this work for synchronization is required among those parallel tasks, personal or classroom use is granted without fee provided that copies are whichblocks the system temporarily. Thus, it is important not made or distributed for profit or commercial advantage and that copies to execute reconfigurations efficiently to minimize the bear this notice and the full citation on the first page. Copyrights for third- unavoidable unavailablity time during reconfigurations. party components of this work must be honored. For all other uses, contact Usability. A stream system should also provide intuitive the owner/author(s). and easy-to-use APIs for users to implement their control SoCC’21, November 1Å›4, 2021, Seattle, WA, USA policies, ideally without assuming that users understand the ©2021Copyrightheldbytheowner/author(s). details of reconfiguration execution. ACMISBN978-1-4503-8638-8/21/11. https://doi.org/10.1145/3472883.3487010 SoCC’21,November1Å›4,2021,Seattle, WA, USA Yancan Mao, Yuan Huang, Runxin Tian, Xin Wang, and Richard T. B. Ma Although existing works provide some of the desirable 2 BACKGROUNDANDMOTIVATION properties, they are unable to achieve all. Due to the use In this section, we first introduce the terminologies used in of a kill-and-restart method to execute reconfigurations, this paper, and then motivate the necessity of supporting Flink[9],Samza[32],andHeron[24]enablereconfigurations reconfigurations with the three proposed properties. at a high cost of efficiency. Research prototypes such as Adistributed stream job runs as a physical deployment Megaphone [19] and Rhino [29] proposed efficient state of an execution plan which instantiates operators to physical management primitives with high usability, but lack of parallel tasks. An execution plan describes the configurations the support for other types of reconfigurations such as of a stream job, and can be represented as a directed graph, change of logic to update execution logic. Chi [28] used whereverticesinthegraphrepresenttasks instantiatedfrom a control message based programming model to support operators, and edges represent the data flow between tasks. various control logic, but was not mainly designed for Specifically, operators maintain the user-defined execution reconfigurations. As specific system-level operations need to logic to process the input data, and tasks that are instantiated bespecifiedtoimplementareconfiguration,Chiwastargeted from the same operator share the same execution logic. for advanced users that manage system internals. The input data of an operator forms the workloads to be 1 In this paper, we present Trisk , a control plane solution processed by tasks in parallel. The workloads of an operator that supports reconfigurations of stream jobs with all three are commonly grouped by keys and partitioned across tasks. properties. The core of Trisk is a task-centric abstraction Each task is allocated with certain resources such as CPU that describes the execution plan of the target stream coresandmemoryonanodeinclusterforphysicalexecution. job. The execution plan of a stream job maintains the Toachieve performance requirements, users often apply configurations of its physical tasks and is used to deploy control policy on stream jobs. A control policy involves the job on a cluster. Since any reconfiguration boils down two steps. First, it monitors the stream job and decides to change the existing execution plan to a new one, it can whether or not to update the current execution plan based be formally described by the operations applied on the onthesymptomsdetected, e.g., backpressure in the pipeline. current execution plan. To provide usability, we classify Second, the control policy needs to identify the performance the operations into three types of primitive operations, bottleneck in stream jobs and invoke reconfigurations to so that various reconfigurations can be implemented by optimize it accordingly. Different control policies make applying a combination of primitive operations on the Trisk decisions based on different kinds of metrics [14, 15, 22] abstraction. To execute reconfiguration efficiently with low in both system level, e.g. CPU utilization, and application system overhead, we adopt a partial pause-and-resume level, e.g. observed arrival rate and backpressure. In this mechanismbyleveragingsynchronization mechanisms in work, we focus on the execution of reconfigurations given the native stream, where only part of the stream job will be the decisions of control policies, while metrics retrieval paused and updated. We implement Trisk on top of Apache mechanismsareregarded as a part of the control logic. Flink by leveraging the checkpoint mechanism to achieve Reconfigurationsneedtodynamicallychangethephysical synchronization, and show that Trisk achieves sub-second execution plan of a stream job, which boils down to completion time to execute reconfigurations. In summary, reconfigure its resources, workloads, and execution logic. wemakethefollowingcontributions: Such a variety of reconfigurations are required by control • Weproposeacontrolplanesolution,Trisk,thatmaintainsa policies to achieve different performance goals. For example, task-centric abstraction with three-dimensional primitive to achieve a SLO/SLA objective for general stream jobs, operations to implement versatile reconfigurations with prior works such as Henge [23], Dhalion [14], DS2 [22], high usability. and DRS [15] introduce control policies based on scaling • Wedesignandimplementaprepare-sync-resumepipeline to reallocate resources for stream jobs. To achieve balanced to execute reconfigurations by leveraging synchronization load and better resource utilization, prior works such as mechanismsinthenative stream. DKG[40]proposecontrolpolicies to detect data skewness • Weintegrate Trisk with Flink and leverage the checkpoint andapplyloadbalancing tomanagetheworkloadsofstream mechanisminFlinktoexecute reconfigurations. jobs. Furthermore, for machine learning based stream jobs • We evaluate Trisk via comprehensive experiments suchasonlineanomalydetection[18],becausenewscenarios using both real-world applications and synthetic micro- and input data are emerging over time, the model with benchmark.WealsocompareTriskwithnativeFlinkonthe current parameters may fail to process them accurately and performance of supporting control policies and executing effectively. To solve this problem, the model needs to be reconfigurations. updated appropriately, where change of logic can be applied to achieve dynamic model tuning. 1Thesource codes are available at: https://github.com/sane-lab/Trisk Trisk: Task-Centric Data Stream Reconfiguration SoCC’21,November1Å›4,2021,Seattle, WA, USA Table 1: Overview of existing work enables reconfigurations in stream systems. Methodology Versatility Usability Efficiency Flink [9] Dataflow model + Redeploy Medium High Low Heron[24] Dataflow model + Redeploy Medium High Low Seep [12] State management primitives + Partial redeploy Low High Medium Rhino [29] State management primitives + Partial update Low High High Megaphone[19] Statemanagementprimitives+Non-stoppartial update Medium High High Chi [28] Message-based programming model + Partial update High Medium High Trisk Three-dimensional task-centric abstraction + Partial update High High High Although reconfiguration is best supported with three Targeted for achieving all three desirable properties for properties [10]: versatility, efficiency, and usability, existing stream reconfigurations, Trisk is designed as a control systems and research fall short in achieving all of them. We plane solution applicable to general stream systems, and summarized existing works that support reconfigurations encapsulates mechanisms for general control policies. To for stream jobs in Table 1, and classify them into three types achieve versatility, Trisk uses a task-centric abstraction, of implementations. which describes the configurations of each task in three Built-inreconfigurationleveragestheoriginaldataflow dimensions i.e., resources, workloads and execution logic. model and programming interfaces provided by stream The Trisk abstraction is designed around tasks, as the systems to enable reconfigurations. For example, Flink [9] states of tasks describe configurations at the minimum andHeron[24]redeploythestreamjobwithupdatedcontext granularity, i.e., reconfigurations can be achieved by for all tasks, i.e. restarting the job with modified source code updating a subset of tasks. For example, load balancing and configuration files. Although reconfigurations can be redistributes workloads among tasks, scaling cancels or easily invoked through the original programming interfaces deploys tasks, placement redeploys tasks on other nodes, provided by the stream systems, they are executed in low and change of logic updates the execution logic of tasks. efficiency and incur high system overhead and performance Based on the abstraction, Trisk implements three primitive degradation due to the nature of kill-and-restart. operations (Section 3.1) on updating tasks along the three Reconfiguration for state management has been dimensions and encapsulates them as a set of APIs. For designed in prior works such as SEEP [12], Rhino [29] and usability, Trisk provides common reconfigurations (Section Megaphone[19]. These works proposed state management 3.3) for users to implement control policies easily; while any primitives that provide interfaces to manage the state of generalreconfigurationscanbeimplementedbycompositing stream jobs efficiently. Stateful stream jobs maintain state primitive operations (Section 4.2). Trisk uses a prepare- to process each of the assigned keys, which is regarded as sync-update execution pipeline to execute reconfigurations a workload-related configuration in our context. With the efficiently (Section 3.2), under which tasks are partially provided interfaces, reconfigurations that cover workloads paused and updated asynchronously. This enables Trisk to redistribution for stateful jobs can be implemented with leverage the synchronization mechanisms in native stream high usability and efficiency. However, such primitives are systems with low system overhead. limited to state management and do not support other types of reconfigurations such as placement and change of logic. 3 DESIGN Reconfiguration via a control plane encapsulates Wefocus on the problem of reconfiguring stream jobs on- mechanismsforapplyingvarious control logics on stream the-fly, and our goal is to design a control plane that enables jobs, which supports a variety of reconfigurations. Chi [28] versatile reconfigurations while maintaining usability and proposes a programming model based on control message efficiency. In this section, we first introduce the design of the injection, through which new reconfigurations can be Trisk abstraction, and then describe the mechanisms that implemented by applying fine-grained instructions on each enable asynchronous execution of the reconfiguration. Last, task and embed them into control messages. Tasks are wepresent the reconfiguration APIs and show how users updated asynchronously upon receiving the instructions in can implement control policies by using them. the control messages. However, since the task update logic is defined by users, they need to be familiar with the execution 3.1 TheTriskAbstraction details of the stream system and implement instructions TheTrisk abstraction maintains an abstract execution plan accordingly, which requires non-trivial engineering efforts. that is independent of stream systems for extensibility. This SoCC’21,November1Å›4,2021,Seattle, WA, USA Yancan Mao, Yuan Huang, Runxin Tian, Xin Wang, and Richard T. B. Ma Input Keys anupstreamandadownstreamtasksgothroughnetworks Logical topology if both tasks are deployed in different physical machines. Parallelizing Intermediate Stream Parallelized tasks UDF Physical Machine Key States Key Mappings Resource slot Deploying Figure2:ConfigurationsoftasksinTriskabstraction. Physical execution Message Queue Figure 1: Deployment steps of jobs in stream systems Figure 2 illustrates the four configurations associated with each task specified in the Trisk abstraction, i.e., Key State, is achieved by specifying the execution plan in terms of the User-Defined Function (UDF), Key Mapping, and Resource Slot. configurations with respect to individual tasks, which can be • Along the execution logic dimension, User-Defined classifiedalongthreedimensions:executionlogic,workloads, Function (UDF) defines the processing logic on each input andresources. In other words, any reconfiguration consists tuplethatitreceived.Afterprocessing,theresultsfromthe of mainly three types of operations. The intuition behind UDFformtheoutputstreams.Forstateful tasks, UDF has the three-dimensional Trisk abstraction is derived from the accesstoitsprocessingstate,whichisgeneratedaccording three-stepdeploymentofstreamjobsshowninFigure1.This to the processing history of arrived data, and will update streamjobhastwooperators(í µí±‚1,í µí±‚2),whichareinstantiated the state after new tuples being processed. as three tasks (í µí±‡ ,í µí±‡ ,í µí±‡ ) physically deployed across two 1 2 3 • Along the workloads dimension, the distribution of machines. The keyspace of the data stream contains four workloads among the tasks of an operator is described unique keys and is partitioned into two substreams. bytheKeyState distributed across the tasks and the Key In the first step, a stream job is defined by its logical Mapping in the upstream tasks. Key State represents the topology described as a DAG, where vertices represent assigned subset of input keys to be processed and the operators and edges represent the intermediate data streams. associated processing state to be maintained. Key Mapping Atthisstage,theexecutionlogicisconfiguredandassociated defines how a task maps the keys of output results to with each operator, implying that all instances of parallel downstream tasks. The Key Mapping in the upstream tasks of the operator will use the same execution logic to tasks also represents the global Key State assignment of process the assigned input streams so as to generate outputs. downstreamtasks, i.e., the combination of Key State of all In the second step, the stream job specifies the number downstreamtasks. of parallel tasks to be instantiated for each operator. Input • Along the resources dimension, Resource Slot denotes the data is often defined over a key space, and each task will be amountofresourcesallocatedtoatask,e.g.,CPUcoresand assigned with a partition of an non-overlapping subset of memoryobtainedfromtheresourcemanagementsystem; keys for independent data processing. The configuration it also describes the location of task to be deployed, which is maintained by both upstream and downstream tasks. is important for communication efficiency and avoiding In particular, the upstream tasks maintain the routing resource contention. information, which maps their processing results to Ourtask-centric abstraction is general for providing the downstream tasks. Each downstream task keeps a subset versatility of reconfigurations, because any reconfiguration of input keys representing the subset of substreams to be boils downtoupdatingthethreetypesoftaskconfigurations, processed and the corresponding states to be managed. originally executed by the initial deployment of stream jobs. At this stage, the configuration of workloads needs to be Besides the chosen configurations, the Trisk abstraction can specified for the individual tasks. be easily extended, since all configurations are generated In the final step, the stream job deploys tasks on during the initial deployment. For example, the batch size in physical machines. In particular, each task is assigned to mini-batchprocessingcanbeclassifiedasatypeofexecution a resource slot configured with resources that determine its logic configuration to define how input tuples are batched. performance. For example, computational resource such as Based on the dimensions of execution logic, workloads and CPUcoresaffects the processing rate and memory resource resources, Trisk implements common reconfigurations of is used to store on-going processing states and affects the change of logic, load balancing and placement, respectively. speed of I/O operations. Furthermore, data streams between Furthermore, by using operations along the dimensions of workloads and resources, Trisk also implements scaling.
no reviews yet
Please Login to review.