https://github.com/bytedance/cloudshuffleservice

Cloud Shuffle Service(CSS) is a general purpose remote shuffle solution for compute engines, including Spark/Flink/MapReduce.

https://github.com/bytedance/cloudshuffleservice

Science Score: 13.0%

This score indicates how likely this project is to be science-related based on various indicators:

  • CITATION.cff file
  • codemeta.json file
    Found codemeta.json file
  • .zenodo.json file
  • DOI references
  • Academic publication links
  • Committers with academic emails
  • Institutional organization owner
  • JOSS paper metadata
  • Scientific vocabulary similarity
    Low similarity (10.8%) to scientific vocabulary

Keywords

flink hadoop-mapreduce spark
Last synced: 9 months ago · JSON representation

Repository

Cloud Shuffle Service(CSS) is a general purpose remote shuffle solution for compute engines, including Spark/Flink/MapReduce.

Basic Info
  • Host: GitHub
  • Owner: bytedance
  • License: apache-2.0
  • Language: Java
  • Default Branch: main
  • Homepage:
  • Size: 1.23 MB
Statistics
  • Stars: 255
  • Watchers: 9
  • Forks: 58
  • Open Issues: 10
  • Releases: 0
Topics
flink hadoop-mapreduce spark
Created almost 4 years ago · Last pushed about 2 years ago
Metadata Files
Readme Contributing License Code of conduct

README.md

English | 简体中文

Cloud Shuffle Service

GitHub license

Cloud Shuffle Service(CSS) is a general purpose remote shuffle solution for compute engines, including Spark/Flink/MapReduce. It provides reliable, high-performance, and elastic data shuffling capabilities to these compute engines. Shuffled data is pushed to the CSS cluster, stored in disk or HDFS, and can be fetched from the CSS cluster by compute engines.

CSS Architecture - CSS Worker

Stores shuffled data that is pushed from map tasks in memory and persist them to file system asynchronously, allowing reduce tasks to subsequently fetch data from CSS workers.

  • CSS Master

CSS Master is a coordinator component for an application's shuffle process, and is integrated into the application. It reads CSS worker list from ZooKeeper and assigns them to the application to do shuffling, tracks the progress of running map tasks, and then notifies CSS workers to commit files after all the map tasks have finished.

  • CSS Client

Map/Reduce task use CSS client to push/fetch shuffled data to the assigned CSS workers.

Building CSS

mvn build

CSS is built using Apache Maven. Building CSS using Maven requires Java 8 and either Scala 2.12 or Scala 2.11.

bash mvn -DskipTests clean package

Building a Runnable Distribution

To create a CSS distribution, use ./build.sh in the project root directory. bash ./build.sh It generates a tgz package, you can copy it to the nodes you want to deploy CSS. css-1.0.0-bin ├── LICENSE ├── README.md ├── client ├── conf ├── docs ├── lib // CSS cluster lib └── sbin

Deploy CSS Cluster

CSS provides two deployment modes, standalone and zookeeper mode. The standalone mode is currently only for testing, while the zookeeper mode is used in the production environment.

  1. Place the above built CSS tgz file on each node of the Cluster.
  2. Unpack it to a dir, which can be set to CSSHOME environment, all default conf, metrics and workers list can be found in the ```$CSSHOME/conf``` directory.
  3. Update $CSS_HOME/sbin/css-config.sh. ``` # standalone mode CSSMASTERHOST= MASTERJAVAOPTS="-Xmx8192m" WORKERJAVAOPTS="-Xmx8192m -XX:MaxDirectMemorySize=100000m"

# zookeeper mode WORKERJAVAOPTS="-Xmx8192m -XX:MaxDirectMemorySize=100000m" 4. Update$CSS_HOME/conf/css-defaults.conf css.cluster.name =

# standalone(for testing) or zookeeper(for production) css.worker.registry.type = zookeeper # only for zookeeper mode css.zookeeper.address = :,:,:

# css worker common conf css.flush.queue.capacity = 4096 css.flush.buffer.size = 128k css.network.timeout = 600s css.epoch.rotate.threshold = 1g css.push.io.numConnectionsPerPeer = 8 css.push.io.threads = 128 css.replicate.threads = 128 css.fetch.io.threads = 64 css.fetch.chunk.size = 4m css.shuffle.server.chunkFetchHandlerThreadsPercent = 400

# hdfs storage css.hdfsFlusher.base.dir = hdfs://xxx css.hdfsFlusher.num = -1 css.hdfsFlusher.replica = 2

# local disk storage css.diskFlusher.base.dirs = /data00/css,/data01/css css.disk.dir.num.min = 1 5. Define your metrics and worker node host in the following files: $CSSHOME/conf/css-metrics.properties $CSSHOME/conf/workers 6. Sync all the updated config files to each node of the Cluster. 7. Start the CSS Cluster Shuffle workers. The script will ssh into each css worker node and start the Workers. # standalone mode cd $CSSHOME;bash ./sbin/start-all.sh # zookeeper mode cd $CSSHOME;bash ./sbin/start-workers.sh ```

Running with Spark

  1. Copy $CSS_HOME/client/spark-${version}/*.jar to $SPARK_HOME/jars/ .
  2. Run spark with CSS. ``` # standalone mode --conf spark.css.cluster.name= \ --conf spark.css.master.address=css://:\ --conf spark.shuffle.manager=org.apache.spark.shuffle.css.CssShuffleManager\

# zookeeper mode --conf spark.css.cluster.name= \ --conf spark.css.zookeeper.address=":,:,:" \ --conf spark.shuffle.manager=org.apache.spark.shuffle.css.CssShuffleManager\ ```

Spark Adaptive Query Execution Support

CSS supports all the features of AQE. To support skew join optimization, it is necessary to patch the file to Spark and re-build Spark. ./patch/spark-3.0-aqe-skewjoin.patch

Configuration

CSS Cluster Server

All detailed configuration can be found in the CssConf class.

| Property Name | Default | Meaning | |---------------------------|------------|-----------------------------------------------------------------------------------------------------------------------------------| | css.cluster.name | - | The cluster name for the CSS cluster. | | css.worker.registry.type | standalone | The worker registry type (e.g. standalone, zookeeper). This will also specify if CSS will run under Standalone or zookeeper mode. | | css.zookeeper.address | - | (For zookeeper mode) The CSS zookeeper address. | | css.push.io.threads | 32 | The CSS Threads for netty push data io. | | css.fetch.io.threads | 32 | The CSS Threads for netty fetch data io. | | css.commit.threads | 128 | The CSS Threads for stage end to close partition file. | | css.diskFlusher.base.dirs | /tmp/css | The CSS Disk Base dirs (e.g. /data00/css,/data01/css). | | css.hdfsFlusher.base.dir | - | The CSS HDFS Base dir (e.g. hdfs://xxx). |

CSS Spark Client

| Property Name | Default | Meaning | |-------------------------------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | css.max.allocate.worker | 1000 | The Maximum number of workers requested for shuffling. | | css.worker.allocate.extraRatio | 1.5 | The application can allocate additional workers controlled by this extra ratio, the final number will be calculated with Min(Max(2, targetWorker), MaxAllocateWorker). | | css.backpressure.enabled | true | The back pressure control, when enabled, it will use Gradient2Limit to control push data rate, otherwise use FixedLimit. | | css.fixRateLimit.threshold | 64 | Fixed Rate for the back pressure control. | | css.data.io.threads | 8 | The Maximum client side data sending for netty thread. | | css.maxPartitionsPerGroup | 100 | The Maximum number of partitions per group, each data push will send one group at a time. | | css.partitionGroup.push.buffer.size | 4m | The Maximum buffer size sent per each data push, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). | | css.client.mapper.end.timeout | 600s | The Maximum timeout to wait for all data to be sent before mapTask ends. | | css.stage.end.timeout | 600s | The Maximum timeout to wait for all partition files to close. | | css.sortPush.spill.record.threshold | 1000000 | The Maximum records for sending data. | | css.sortPush.spill.size.threshold | 256m | The Maximum size for sending data, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). | | css.shuffle.mode | DISK | Choose which storage mode to use (e.g. DISK, HDFS). | | css.epoch.rotate.threshold | 1g | The file auto rotate switch threshold size for new files, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). | | css.client.failed.batch.blacklist.enabled | true | When MapTask encounters onFailure, the current reduceId-epochId-mapId-mapAttemptId-batchId will be recorded into the blacklist. In AE skewjoin mode, this switch must be turned on, otherwise there will be correctness problems. | | css.compression.codec | lz4 | It is recommended to use zstd compression mode. Compared with lz4, it can improve the compression ratio by 30%, and only consume an additional 8% of performance. |

Contribution

Please check Contributing for more details.

Code of Conduct

Please check Code of Conduct for more details.

Security

If you discover a potential security issue in this project, or think you may have discovered a security issue, we ask that you notify Bytedance Security via our security center or vulnerability reporting email.

Please do not create a public GitHub issue.

License

This project is licensed under the Apache-2.0 License.

Owner

  • Name: Bytedance Inc.
  • Login: bytedance
  • Kind: organization
  • Location: Singapore

GitHub Events

Total
  • Watch event: 13
  • Fork event: 2
Last Year
  • Watch event: 13
  • Fork event: 2

Committers

Last synced: about 1 year ago

All Time
  • Total Commits: 1
  • Total Committers: 1
  • Avg Commits per committer: 1.0
  • Development Distribution Score (DDS): 0.0
Past Year
  • Commits: 0
  • Committers: 0
  • Avg Commits per committer: 0.0
  • Development Distribution Score (DDS): 0.0
Top Committers
Name Email Commits
weizhongjia w****a@b****m 1
Committer Domains (Top 20 + Academic)

Issues and Pull Requests

Last synced: about 1 year ago

All Time
  • Total issues: 9
  • Total pull requests: 8
  • Average time to close issues: N/A
  • Average time to close pull requests: 4 minutes
  • Total issue authors: 7
  • Total pull request authors: 4
  • Average comments per issue: 2.11
  • Average comments per pull request: 0.0
  • Merged pull requests: 0
  • Bot issues: 0
  • Bot pull requests: 0
Past Year
  • Issues: 0
  • Pull requests: 0
  • Average time to close issues: N/A
  • Average time to close pull requests: N/A
  • Issue authors: 0
  • Pull request authors: 0
  • Average comments per issue: 0
  • Average comments per pull request: 0
  • Merged pull requests: 0
  • Bot issues: 0
  • Bot pull requests: 0
Top Authors
Issue Authors
  • Lobo2008 (2)
  • long1208 (1)
  • CHENXCHEN (1)
  • xiaowuzi010599 (1)
  • a140262 (1)
  • gaoyajun02 (1)
  • xcl1102 (1)
Pull Request Authors
  • king0226-ttree (2)
  • GOptimistic (2)
  • JohnBenjamnHarny (2)
  • zhmin (1)
Top Labels
Issue Labels
Pull Request Labels

Dependencies

.github/workflows/ut.yml actions
  • actions/cache v1 composite
  • actions/checkout v2 composite
  • actions/setup-java v1 composite
  • ashley-taylor/junit-report-annotations-action master composite
api/pom.xml maven
  • log4j:log4j provided
  • org.slf4j:jcl-over-slf4j provided
  • org.slf4j:jul-to-slf4j provided
  • org.slf4j:slf4j-api provided
  • org.slf4j:slf4j-log4j12 provided
client/pom.xml maven
  • org.apache.hadoop:hadoop-client provided
  • com.bytedance.inf:css-common_${scala.binary.version} ${project.version}
  • com.github.luben:zstd-jni
  • com.netflix.concurrency-limits:concurrency-limits-core
  • org.lz4:lz4-java
  • junit:junit test
  • org.apache.commons:commons-lang3 test
common/pom.xml maven
  • com.bytedance.inf:css-network-common ${project.version}
  • com.google.guava:guava
  • io.dropwizard.metrics:metrics-core
  • io.dropwizard.metrics:metrics-graphite
  • io.dropwizard.metrics:metrics-jmx
  • io.dropwizard.metrics:metrics-jvm
  • javax.servlet:javax.servlet-api
  • log4j:log4j
  • org.scala-lang:scala-library ${scala.version}
  • org.slf4j:jcl-over-slf4j
  • org.slf4j:jul-to-slf4j
  • org.slf4j:slf4j-api
  • org.slf4j:slf4j-log4j12
  • junit:junit test
  • org.mockito:mockito-core test
  • org.scalatest:scalatest_${scala.binary.version} test
css-assembly_2/pom.xml maven
  • com.bytedance.inf:css-service_${scala.binary.version} ${project.version}
  • com.bytedance.inf:shuffle-manager-2 ${project.version}
css-assembly_3/pom.xml maven
  • com.bytedance.inf:css-service_${scala.binary.version} ${project.version}
  • com.bytedance.inf:shuffle-manager-3 ${project.version}
network-common/pom.xml maven
  • com.google.guava:guava compile
  • org.slf4j:slf4j-api provided
  • com.fasterxml.jackson.core:jackson-annotations
  • com.fasterxml.jackson.core:jackson-databind
  • com.google.code.findbugs:jsr305
  • io.dropwizard.metrics:metrics-core
  • io.dropwizard.metrics:metrics-jvm
  • io.netty:netty-all
  • org.apache.commons:commons-crypto
  • org.apache.commons:commons-lang3
  • org.fusesource.leveldbjni:leveldbjni-all 1.8
  • junit:junit test
  • log4j:log4j test
  • org.mockito:mockito-core test
  • org.slf4j:slf4j-log4j12 test
pom.xml maven
  • com.google.code.findbugs:jsr305 3.0.0 compile
  • org.apache.commons:commons-crypto 1.0.0 compile
  • org.apache.hadoop:hadoop-client 2.6.0 compile
  • org.apache.hadoop:hadoop-common 2.6.0 compile
  • org.fusesource.leveldbjni:leveldbjni-all 1.8 compile
  • com.fasterxml.jackson.core:jackson-annotations 2.10.0
  • com.fasterxml.jackson.core:jackson-databind 2.10.0
  • com.github.luben:zstd-jni 1.5.0-4
  • com.google.guava:guava 14.0.1
  • com.netflix.concurrency-limits:concurrency-limits-core 0.3.6
  • io.dropwizard.metrics:metrics-core 4.2.2
  • io.dropwizard.metrics:metrics-graphite 4.2.2
  • io.dropwizard.metrics:metrics-jmx 4.2.2
  • io.dropwizard.metrics:metrics-jvm 4.2.2
  • io.netty:netty-all 4.1.47.Final
  • javax.servlet:javax.servlet-api 3.1.0
  • log4j:log4j 1.2.17
  • org.apache.commons:commons-lang3 3.9
  • org.apache.curator:curator-framework 2.7.1
  • org.apache.spark:spark-tags_${scala.binary.version} 3.0.1
  • org.lz4:lz4-java 1.7.1
  • org.slf4j:jcl-over-slf4j 1.7.30
  • org.slf4j:jul-to-slf4j 1.7.30
  • org.slf4j:slf4j-api 1.7.30
  • org.slf4j:slf4j-log4j12 1.7.30
  • junit:junit 4.12 test
  • org.apache.curator:curator-test 2.7.1 test
  • org.mockito:mockito-core 1.10.19 test
  • org.scalatest:scalatest_${scala.binary.version} 3.0.8 test
service/pom.xml maven
  • com.bytedance.inf:css-api ${project.version}
  • com.bytedance.inf:css-common_${scala.binary.version} ${project.version}
  • org.apache.curator:curator-framework
  • org.apache.hadoop:hadoop-client
  • org.apache.hadoop:hadoop-common
  • com.bytedance.inf:css-client_${scala.binary.version} ${project.version} test
  • junit:junit test
  • org.apache.curator:curator-test test
  • org.pegdown:pegdown 1.4.2 test
  • org.scalatest:scalatest_${scala.binary.version} test
spark-shuffle-manager-2/pom.xml maven
  • com.bytedance.inf:css-common_${scala.binary.version} ${project.version} compile
  • org.apache.hadoop:hadoop-common ${hadoop.version} provided
  • org.apache.spark:spark-core_${scala.binary.version} ${spark.version} provided
  • org.apache.spark:spark-sql_${scala.binary.version} ${spark.version} provided
  • org.scala-lang:scala-library ${scala.version} provided
  • com.bytedance.inf:css-api ${project.version}
  • com.bytedance.inf:css-client_${scala.binary.version} ${project.version}
  • junit:junit test
  • org.scalatest:scalatest_${scala.binary.version} test
spark-shuffle-manager-3/pom.xml maven
  • org.apache.spark:spark-core_${scala.binary.version} ${spark.version} provided
  • org.apache.spark:spark-sql_${scala.binary.version} ${spark.version} provided
  • org.scala-lang:scala-library ${scala.version} provided
  • com.bytedance.inf:css-api ${project.version}
  • com.bytedance.inf:css-client_${scala.binary.version} ${project.version}
  • junit:junit test
  • org.scalatest:scalatest_${scala.binary.version} test