https://github.com/bytedance/cloudshuffleservice
Cloud Shuffle Service(CSS) is a general purpose remote shuffle solution for compute engines, including Spark/Flink/MapReduce.
Science Score: 13.0%
This score indicates how likely this project is to be science-related based on various indicators:
-
○CITATION.cff file
-
✓codemeta.json file
Found codemeta.json file -
○.zenodo.json file
-
○DOI references
-
○Academic publication links
-
○Committers with academic emails
-
○Institutional organization owner
-
○JOSS paper metadata
-
○Scientific vocabulary similarity
Low similarity (10.8%) to scientific vocabulary
Keywords
Repository
Cloud Shuffle Service(CSS) is a general purpose remote shuffle solution for compute engines, including Spark/Flink/MapReduce.
Basic Info
Statistics
- Stars: 255
- Watchers: 9
- Forks: 58
- Open Issues: 10
- Releases: 0
Topics
Metadata Files
README.md
English | 简体中文
Cloud Shuffle Service
Cloud Shuffle Service(CSS) is a general purpose remote shuffle solution for compute engines, including Spark/Flink/MapReduce. It provides reliable, high-performance, and elastic data shuffling capabilities to these compute engines. Shuffled data is pushed to the CSS cluster, stored in disk or HDFS, and can be fetched from the CSS cluster by compute engines.
- CSS Worker
Stores shuffled data that is pushed from map tasks in memory and persist them to file system asynchronously, allowing reduce tasks to subsequently fetch data from CSS workers.
- CSS Master
CSS Master is a coordinator component for an application's shuffle process, and is integrated into the application. It reads CSS worker list from ZooKeeper and assigns them to the application to do shuffling, tracks the progress of running map tasks, and then notifies CSS workers to commit files after all the map tasks have finished.
- CSS Client
Map/Reduce task use CSS client to push/fetch shuffled data to the assigned CSS workers.
Building CSS
mvn build
CSS is built using Apache Maven. Building CSS using Maven requires Java 8 and either Scala 2.12 or Scala 2.11.
bash
mvn -DskipTests clean package
Building a Runnable Distribution
To create a CSS distribution, use ./build.sh in the project root directory.
bash
./build.sh
It generates a tgz package, you can copy it to the nodes you want to deploy CSS.
css-1.0.0-bin
├── LICENSE
├── README.md
├── client
├── conf
├── docs
├── lib // CSS cluster lib
└── sbin
Deploy CSS Cluster
CSS provides two deployment modes, standalone and zookeeper mode. The standalone mode is currently only for testing, while the zookeeper mode is used in the production environment.
- Place the above built CSS tgz file on each node of the Cluster.
- Unpack it to a dir, which can be set to CSSHOME environment, all default conf, metrics and workers list can be found in the ```$CSSHOME/conf``` directory.
- Update
$CSS_HOME/sbin/css-config.sh. ``` # standalone mode CSSMASTERHOST=MASTERJAVAOPTS="-Xmx8192m" WORKERJAVAOPTS="-Xmx8192m -XX:MaxDirectMemorySize=100000m"
# zookeeper mode
WORKERJAVAOPTS="-Xmx8192m -XX:MaxDirectMemorySize=100000m"
4. Update$CSS_HOME/conf/css-defaults.conf
css.cluster.name =
# standalone(for testing) or zookeeper(for production)
css.worker.registry.type = zookeeper
# only for zookeeper mode
css.zookeeper.address =
# css worker common conf css.flush.queue.capacity = 4096 css.flush.buffer.size = 128k css.network.timeout = 600s css.epoch.rotate.threshold = 1g css.push.io.numConnectionsPerPeer = 8 css.push.io.threads = 128 css.replicate.threads = 128 css.fetch.io.threads = 64 css.fetch.chunk.size = 4m css.shuffle.server.chunkFetchHandlerThreadsPercent = 400
# hdfs storage css.hdfsFlusher.base.dir = hdfs://xxx css.hdfsFlusher.num = -1 css.hdfsFlusher.replica = 2
# local disk storage
css.diskFlusher.base.dirs = /data00/css,/data01/css
css.disk.dir.num.min = 1
5. Define your metrics and worker node host in the following files:
$CSSHOME/conf/css-metrics.properties
$CSSHOME/conf/workers
6. Sync all the updated config files to each node of the Cluster.
7. Start the CSS Cluster Shuffle workers. The script will ssh into each css worker node and start the Workers.
# standalone mode
cd $CSSHOME;bash ./sbin/start-all.sh
# zookeeper mode
cd $CSSHOME;bash ./sbin/start-workers.sh
```
Running with Spark
- Copy
$CSS_HOME/client/spark-${version}/*.jarto$SPARK_HOME/jars/. - Run spark with CSS.
```
# standalone mode
--conf spark.css.cluster.name=
\ --conf spark.css.master.address=css:// : \ --conf spark.shuffle.manager=org.apache.spark.shuffle.css.CssShuffleManager\
# zookeeper mode
--conf spark.css.cluster.name=
Spark Adaptive Query Execution Support
CSS supports all the features of AQE. To support skew join optimization, it is necessary to patch the file to Spark and re-build Spark.
./patch/spark-3.0-aqe-skewjoin.patch
Configuration
CSS Cluster Server
All detailed configuration can be found in the CssConf class.
| Property Name | Default | Meaning | |---------------------------|------------|-----------------------------------------------------------------------------------------------------------------------------------| | css.cluster.name | - | The cluster name for the CSS cluster. | | css.worker.registry.type | standalone | The worker registry type (e.g. standalone, zookeeper). This will also specify if CSS will run under Standalone or zookeeper mode. | | css.zookeeper.address | - | (For zookeeper mode) The CSS zookeeper address. | | css.push.io.threads | 32 | The CSS Threads for netty push data io. | | css.fetch.io.threads | 32 | The CSS Threads for netty fetch data io. | | css.commit.threads | 128 | The CSS Threads for stage end to close partition file. | | css.diskFlusher.base.dirs | /tmp/css | The CSS Disk Base dirs (e.g. /data00/css,/data01/css). | | css.hdfsFlusher.base.dir | - | The CSS HDFS Base dir (e.g. hdfs://xxx). |
CSS Spark Client
| Property Name | Default | Meaning | |-------------------------------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | css.max.allocate.worker | 1000 | The Maximum number of workers requested for shuffling. | | css.worker.allocate.extraRatio | 1.5 | The application can allocate additional workers controlled by this extra ratio, the final number will be calculated with Min(Max(2, targetWorker), MaxAllocateWorker). | | css.backpressure.enabled | true | The back pressure control, when enabled, it will use Gradient2Limit to control push data rate, otherwise use FixedLimit. | | css.fixRateLimit.threshold | 64 | Fixed Rate for the back pressure control. | | css.data.io.threads | 8 | The Maximum client side data sending for netty thread. | | css.maxPartitionsPerGroup | 100 | The Maximum number of partitions per group, each data push will send one group at a time. | | css.partitionGroup.push.buffer.size | 4m | The Maximum buffer size sent per each data push, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). | | css.client.mapper.end.timeout | 600s | The Maximum timeout to wait for all data to be sent before mapTask ends. | | css.stage.end.timeout | 600s | The Maximum timeout to wait for all partition files to close. | | css.sortPush.spill.record.threshold | 1000000 | The Maximum records for sending data. | | css.sortPush.spill.size.threshold | 256m | The Maximum size for sending data, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). | | css.shuffle.mode | DISK | Choose which storage mode to use (e.g. DISK, HDFS). | | css.epoch.rotate.threshold | 1g | The file auto rotate switch threshold size for new files, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). | | css.client.failed.batch.blacklist.enabled | true | When MapTask encounters onFailure, the current reduceId-epochId-mapId-mapAttemptId-batchId will be recorded into the blacklist. In AE skewjoin mode, this switch must be turned on, otherwise there will be correctness problems. | | css.compression.codec | lz4 | It is recommended to use zstd compression mode. Compared with lz4, it can improve the compression ratio by 30%, and only consume an additional 8% of performance. |
Contribution
Please check Contributing for more details.
Code of Conduct
Please check Code of Conduct for more details.
Security
If you discover a potential security issue in this project, or think you may have discovered a security issue, we ask that you notify Bytedance Security via our security center or vulnerability reporting email.
Please do not create a public GitHub issue.
License
This project is licensed under the Apache-2.0 License.
Owner
- Name: Bytedance Inc.
- Login: bytedance
- Kind: organization
- Location: Singapore
- Website: https://opensource.bytedance.com
- Twitter: ByteDanceOSS
- Repositories: 255
- Profile: https://github.com/bytedance
GitHub Events
Total
- Watch event: 13
- Fork event: 2
Last Year
- Watch event: 13
- Fork event: 2
Committers
Last synced: about 1 year ago
Top Committers
| Name | Commits | |
|---|---|---|
| weizhongjia | w****a@b****m | 1 |
Committer Domains (Top 20 + Academic)
Issues and Pull Requests
Last synced: about 1 year ago
All Time
- Total issues: 9
- Total pull requests: 8
- Average time to close issues: N/A
- Average time to close pull requests: 4 minutes
- Total issue authors: 7
- Total pull request authors: 4
- Average comments per issue: 2.11
- Average comments per pull request: 0.0
- Merged pull requests: 0
- Bot issues: 0
- Bot pull requests: 0
Past Year
- Issues: 0
- Pull requests: 0
- Average time to close issues: N/A
- Average time to close pull requests: N/A
- Issue authors: 0
- Pull request authors: 0
- Average comments per issue: 0
- Average comments per pull request: 0
- Merged pull requests: 0
- Bot issues: 0
- Bot pull requests: 0
Top Authors
Issue Authors
- Lobo2008 (2)
- long1208 (1)
- CHENXCHEN (1)
- xiaowuzi010599 (1)
- a140262 (1)
- gaoyajun02 (1)
- xcl1102 (1)
Pull Request Authors
- king0226-ttree (2)
- GOptimistic (2)
- JohnBenjamnHarny (2)
- zhmin (1)
Top Labels
Issue Labels
Pull Request Labels
Dependencies
- actions/cache v1 composite
- actions/checkout v2 composite
- actions/setup-java v1 composite
- ashley-taylor/junit-report-annotations-action master composite
- log4j:log4j provided
- org.slf4j:jcl-over-slf4j provided
- org.slf4j:jul-to-slf4j provided
- org.slf4j:slf4j-api provided
- org.slf4j:slf4j-log4j12 provided
- org.apache.hadoop:hadoop-client provided
- com.bytedance.inf:css-common_${scala.binary.version} ${project.version}
- com.github.luben:zstd-jni
- com.netflix.concurrency-limits:concurrency-limits-core
- org.lz4:lz4-java
- junit:junit test
- org.apache.commons:commons-lang3 test
- com.bytedance.inf:css-network-common ${project.version}
- com.google.guava:guava
- io.dropwizard.metrics:metrics-core
- io.dropwizard.metrics:metrics-graphite
- io.dropwizard.metrics:metrics-jmx
- io.dropwizard.metrics:metrics-jvm
- javax.servlet:javax.servlet-api
- log4j:log4j
- org.scala-lang:scala-library ${scala.version}
- org.slf4j:jcl-over-slf4j
- org.slf4j:jul-to-slf4j
- org.slf4j:slf4j-api
- org.slf4j:slf4j-log4j12
- junit:junit test
- org.mockito:mockito-core test
- org.scalatest:scalatest_${scala.binary.version} test
- com.bytedance.inf:css-service_${scala.binary.version} ${project.version}
- com.bytedance.inf:shuffle-manager-2 ${project.version}
- com.bytedance.inf:css-service_${scala.binary.version} ${project.version}
- com.bytedance.inf:shuffle-manager-3 ${project.version}
- com.google.guava:guava compile
- org.slf4j:slf4j-api provided
- com.fasterxml.jackson.core:jackson-annotations
- com.fasterxml.jackson.core:jackson-databind
- com.google.code.findbugs:jsr305
- io.dropwizard.metrics:metrics-core
- io.dropwizard.metrics:metrics-jvm
- io.netty:netty-all
- org.apache.commons:commons-crypto
- org.apache.commons:commons-lang3
- org.fusesource.leveldbjni:leveldbjni-all 1.8
- junit:junit test
- log4j:log4j test
- org.mockito:mockito-core test
- org.slf4j:slf4j-log4j12 test
- com.google.code.findbugs:jsr305 3.0.0 compile
- org.apache.commons:commons-crypto 1.0.0 compile
- org.apache.hadoop:hadoop-client 2.6.0 compile
- org.apache.hadoop:hadoop-common 2.6.0 compile
- org.fusesource.leveldbjni:leveldbjni-all 1.8 compile
- com.fasterxml.jackson.core:jackson-annotations 2.10.0
- com.fasterxml.jackson.core:jackson-databind 2.10.0
- com.github.luben:zstd-jni 1.5.0-4
- com.google.guava:guava 14.0.1
- com.netflix.concurrency-limits:concurrency-limits-core 0.3.6
- io.dropwizard.metrics:metrics-core 4.2.2
- io.dropwizard.metrics:metrics-graphite 4.2.2
- io.dropwizard.metrics:metrics-jmx 4.2.2
- io.dropwizard.metrics:metrics-jvm 4.2.2
- io.netty:netty-all 4.1.47.Final
- javax.servlet:javax.servlet-api 3.1.0
- log4j:log4j 1.2.17
- org.apache.commons:commons-lang3 3.9
- org.apache.curator:curator-framework 2.7.1
- org.apache.spark:spark-tags_${scala.binary.version} 3.0.1
- org.lz4:lz4-java 1.7.1
- org.slf4j:jcl-over-slf4j 1.7.30
- org.slf4j:jul-to-slf4j 1.7.30
- org.slf4j:slf4j-api 1.7.30
- org.slf4j:slf4j-log4j12 1.7.30
- junit:junit 4.12 test
- org.apache.curator:curator-test 2.7.1 test
- org.mockito:mockito-core 1.10.19 test
- org.scalatest:scalatest_${scala.binary.version} 3.0.8 test
- com.bytedance.inf:css-api ${project.version}
- com.bytedance.inf:css-common_${scala.binary.version} ${project.version}
- org.apache.curator:curator-framework
- org.apache.hadoop:hadoop-client
- org.apache.hadoop:hadoop-common
- com.bytedance.inf:css-client_${scala.binary.version} ${project.version} test
- junit:junit test
- org.apache.curator:curator-test test
- org.pegdown:pegdown 1.4.2 test
- org.scalatest:scalatest_${scala.binary.version} test
- com.bytedance.inf:css-common_${scala.binary.version} ${project.version} compile
- org.apache.hadoop:hadoop-common ${hadoop.version} provided
- org.apache.spark:spark-core_${scala.binary.version} ${spark.version} provided
- org.apache.spark:spark-sql_${scala.binary.version} ${spark.version} provided
- org.scala-lang:scala-library ${scala.version} provided
- com.bytedance.inf:css-api ${project.version}
- com.bytedance.inf:css-client_${scala.binary.version} ${project.version}
- junit:junit test
- org.scalatest:scalatest_${scala.binary.version} test
- org.apache.spark:spark-core_${scala.binary.version} ${spark.version} provided
- org.apache.spark:spark-sql_${scala.binary.version} ${spark.version} provided
- org.scala-lang:scala-library ${scala.version} provided
- com.bytedance.inf:css-api ${project.version}
- com.bytedance.inf:css-client_${scala.binary.version} ${project.version}
- junit:junit test
- org.scalatest:scalatest_${scala.binary.version} test