Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update: dynamic process number #588

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
"tcp": "php run.php -c src/tcp/phpunit.xml",
"tcp-server": "php run.php -c src/tcp-server/phpunit.xml",
"validator": "php run.php -c src/validator/phpunit.xml",
"websocket-server": "php run.php -c src/websocket-server/phpunit.xml"
"websocket-server": "php run.php -c src/websocket-server/phpunit.xml",
"process": "php run.php -c src/process/phpunit.xml"
}
}
24 changes: 9 additions & 15 deletions src/process/src/Annotation/Mapping/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,16 @@
* @Annotation
* @Target("CLASS")
* @Attributes({
* @Attribute("workerId", type="array"),
* @Attribute("workerNum", type="int"),
* })
*/
class Process
{
/**
* Default
*/
public const DEFAULT = -1;

/**
* @var array
* @var int
*/
private $workerId = [
self::DEFAULT
];
private $workerNum = 1;

/**
* Process constructor.
Expand All @@ -47,18 +41,18 @@ class Process
public function __construct(array $values)
{
if (isset($values['value'])) {
$this->workerId = (array)$values['value'];
$this->workerNum = (int)$values['value'];
}
if (isset($values['workerId'])) {
$this->workerId = (array)$values['workerId'];
if (isset($values['workerNum'])) {
$this->workerNum = (int)$values['workerNum'];
}
}

/**
* @return array
* @return int
*/
public function getWorkerId(): array
public function getWorkerNum(): int
{
return $this->workerId;
return $this->workerNum;
}
}
2 changes: 1 addition & 1 deletion src/process/src/Annotation/Parser/ProcessParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ProcessParser extends Parser
public function parse(int $type, $annotationObject): array
{
// Register
ProcessRegister::registerProcess($this->className, $annotationObject->getWorkerId());
ProcessRegister::registerProcess($this->className, $annotationObject->getWorkerNum());

return [$this->className, $this->className, Bean::SINGLETON, ''];
}
Expand Down
7 changes: 1 addition & 6 deletions src/process/src/ProcessPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ class ProcessPool
*/
private $pool;

/**
* @var int
*/
private $workerNum = 3;

/**
* @var int
*/
Expand Down Expand Up @@ -106,7 +101,7 @@ class ProcessPool
*/
public function start(): void
{
$this->pool = new Pool($this->workerNum, $this->ipcType, $this->msgQueueKey, $this->coroutine);
$this->pool = new Pool(ProcessRegister::getWorkerNum(), $this->ipcType, $this->msgQueueKey, $this->coroutine);
foreach ($this->on as $name => $listener) {
$listenerInterface = SwooleEvent::LISTENER_MAPPING[$name] ?? '';
if (empty($listenerInterface)) {
Expand Down
32 changes: 17 additions & 15 deletions src/process/src/ProcessRegister.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,21 @@ class ProcessRegister
*/
private static $process = [];

/**
* @var int
*/
private static $workerId = 0;

/**
* @param string $className
* @param array $workerIds
* @param int $workerNum
*
* @throws ProcessException
*/
public static function registerProcess(string $className, array $workerIds): void
public static function registerProcess(string $className, int $workerNum): void
{
foreach ($workerIds as $workerId) {
if (isset(self::$process[$workerId])) {
throw new ProcessException(sprintf('Worker process(%d) for process pool must be only one!', $workerId));
}

self::$process[$workerId]['class'] = $className;
for ($i = 0; $i < $workerNum; $i ++) {
self::$process[self::$workerId ++]['class'] = $className;
}
}

Expand All @@ -60,13 +61,14 @@ public static function getProcess(int $workerId): string
if (!empty($className)) {
return $className;
}
throw new ProcessException(sprintf('Worker process(%d) for process pool must be defined!', $workerId));
}

$default = ProcessAnnotation::DEFAULT;
$className = self::$process[$default]['class'] ?? '';
if (empty($className)) {
throw new ProcessException(sprintf('Worker process(%d) for process pool must be defined!', $workerId));
}

return $className;
/**
* @return int
*/
public static function getWorkerNum(): int
{
return count(self::$process);
}
}
41 changes: 41 additions & 0 deletions src/process/test/testing/AutoLoader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php declare(strict_types=1);
/**
* This file is part of Swoft.
*
* @link https://swoft.org
* @document https://swoft.org/docs
* @contact [email protected]
* @license https://github.com/swoft-cloud/swoft/blob/master/LICENSE
*/

namespace SwoftTest\Process\Testing;

use Swoft\SwoftComponent;

/**
* Class AutoLoader
*
* @since 2.0
*/
class AutoLoader extends SwoftComponent
{
/**
* Get namespace and dirs
*
* @return array
*/
public function getPrefixDirs(): array
{
return [
__NAMESPACE__ => __DIR__,
];
}

/**
* @return array
*/
public function metadata(): array
{
return [];
}
}
30 changes: 30 additions & 0 deletions src/process/test/testing/Process/Process1.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

namespace SwoftTest\Process\Testing\Process;

use Swoft\Log\Helper\CLog;
use Swoft\Process\Annotation\Mapping\Process;
use Swoft\Process\Contract\ProcessInterface;
use Swoole\Coroutine;
use Swoole\Process\Pool;

/**
* Class Process1
*
* @Process(workerNum=3)
*/
class Process1 implements ProcessInterface
{

/**
* @param Pool $pool
* @param int $workerId
*/
public function run(Pool $pool, int $workerId): void
{
while (true) {
CLog::info('worker-' . $workerId);
Coroutine::sleep(3);
}
}
}
29 changes: 29 additions & 0 deletions src/process/test/testing/Process/Process2.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

namespace SwoftTest\Process\Testing\Process;

use Swoft\Log\Helper\CLog;
use Swoft\Process\Annotation\Mapping\Process;
use Swoft\Process\Contract\ProcessInterface;
use Swoole\Coroutine;
use Swoole\Process\Pool;

/**
* Class Process2
* @Process(2)
*/
class Process2 implements ProcessInterface
{

/**
* @param Pool $pool
* @param int $workerId
*/
public function run(Pool $pool, int $workerId): void
{
while (true) {
CLog::info('worker-' . $workerId);
Coroutine::sleep(3);
}
}
}
29 changes: 29 additions & 0 deletions src/process/test/testing/Process/Process3.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

namespace SwoftTest\Process\Testing\Process;

use Swoft\Log\Helper\CLog;
use Swoft\Process\Annotation\Mapping\Process;
use Swoft\Process\Contract\ProcessInterface;
use Swoole\Coroutine;
use Swoole\Process\Pool;

/**
* Class Process2
* @Process()
*/
class Process3 implements ProcessInterface
{

/**
* @param Pool $pool
* @param int $workerId
*/
public function run(Pool $pool, int $workerId): void
{
while (true) {
CLog::info('worker-' . $workerId);
Coroutine::sleep(3);
}
}
}
41 changes: 41 additions & 0 deletions src/process/test/unit/ProcessNumTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace SwoftTest\Process\Unit;

use Swoft\Process\ProcessRegister;
use SwoftTest\Process\Testing\Process\Process1;
use SwoftTest\Process\Testing\Process\Process2;
use SwoftTest\Process\Testing\Process\Process3;

/**
* Class ProcessNumTest
*
* @since 2.0
*/
class ProcessNumTest extends \PHPUnit\Framework\TestCase
{
function testNum(): void
{
$this->assertEquals(ProcessRegister::getWorkerNum(), 6);
}

function testProcessClassMap(): void
{
$workerNum = ProcessRegister::getWorkerNum();

$processClassMap = [];

for ($workerId = 0; $workerId < $workerNum; $workerId ++) {
$class = ProcessRegister::getProcess($workerId);
isset($processClassMap[$class]) ? $processClassMap[$class] ++ : $processClassMap[$class] = 1;
}

$this->assertArrayHasKey(Process1::class, $processClassMap);
$this->assertArrayHasKey(Process2::class, $processClassMap);
$this->assertArrayHasKey(Process3::class, $processClassMap);

$this->assertEquals($processClassMap[Process1::class], 3);
$this->assertEquals($processClassMap[Process2::class], 2);
$this->assertEquals($processClassMap[Process3::class], 1);
}
}