optimize: swow.php

This commit is contained in:
cexll 2022-04-06 09:27:09 +08:00
parent ee8006e38c
commit 6cf982731b
10 changed files with 185 additions and 622 deletions

View File

@ -7,7 +7,7 @@ ini_set('memory_limit', '1G');
date_default_timezone_set('Asia/Shanghai');
!defined('BASE_PATH') && define('BASE_PATH', dirname(__DIR__, 1));
! defined('BASE_PATH') && define('BASE_PATH', dirname(__DIR__, 1));
require BASE_PATH . '/vendor/autoload.php';
@ -15,14 +15,103 @@ use App\Container\Logger;
use App\Error;
use App\Vega;
use Dotenv\Dotenv;
use App\Container\Swow\Http\Server;
use Swow\CoroutineException;
use Swow\Errno;
use Swow\Http\ResponseException;
use Swow\Http\Server as HttpServer;
use Swow\Socket;
use Swow\SocketException;
use function Swow\Sync\waitAll;
Dotenv::createUnsafeImmutable(__DIR__ . '/../', '.env')->load();
define("APP_DEBUG", env('APP_DEBUG'));
Error::register();
class SwowServer extends HttpServer
{
/**
* @var string|null
*/
public $host = null;
/**
* @var int|null
*/
public $port = null;
/**
* @var callable
*/
protected $handler;
/**
* @param string $name
* @param int $port
* @param int $flags
* @return static
*/
public function bind(string $name, int $port = 0, int $flags = Socket::BIND_FLAG_NONE): static
{
$this->host = $name;
$this->port = $port;
parent::bind($name, $port, $flags);
return $this;
}
public function handle(callable $callable)
{
$this->handler = $callable;
return $this;
}
public function start()
{
$this->listen();
\Swow\Coroutine::run(function () {
while (true) {
try {
$connection = $this->acceptConnection();
\Swow\Coroutine::run(function () use ($connection) {
try {
while (true) {
$request = null;
try {
$request = $connection->recvHttpRequest();
$handler = $this->handler;
$handler($request, $connection);
} catch (ResponseException $exception) {
$connection->error($exception->getCode(), $exception->getMessage());
}
if (!$request || !$request->getKeepAlive()) {
break;
}
}
} catch (\Throwable $exception) {
Logger::instance()->error((string)$exception);
} finally {
$connection->close();
}
});
} catch (SocketException|CoroutineException $exception) {
if (in_array($exception->getCode(), [Errno::EMFILE, Errno::ENFILE, Errno::ENOMEM], true)) {
Logger::instance()->warning('Socket resources have been exhausted.');
sleep(1);
} else {
Logger::instance()->error((string)$exception);
break;
}
} catch (\Throwable $exception) {
Logger::instance()->error((string)$exception);
}
}
});
waitAll();
}
}
$vega = Vega::new();
$server = new Server();
$server = new SwowServer();
$host = '0.0.0.0';
$port = 9501;
$server->bind($host, $port)->handle($vega->handler());

View File

@ -1,136 +0,0 @@
<?php
namespace App\Container\Swow;
use App\Container\Swow\Exception\CoroutineDestroyedException;
use ArrayObject;
use Swow\Coroutine as SwowCo;
class Coroutine extends SwowCo
{
/**
* @var ArrayObject
*/
protected $context;
/**
* @var int
*/
protected $parentId;
/**
* @var callable[]
*/
protected $deferCallbacks = [];
/**
* @var ArrayObject|null
*/
protected static $mainContext = null;
public function __construct(callable $callable)
{
parent::__construct($callable);
$this->context = new ArrayObject();
$this->parentId = static::getCurrent()->getId();
}
public function __destruct()
{
while (!empty($this->deferCallbacks)) {
array_shift($this->deferCallbacks)();
}
}
/**
* @param ...$data
* @return static
*/
public function execute(...$data)
{
$this->resume(...$data);
return $this;
}
public function getContext()
{
return $this->context;
}
public function getParentId(): int
{
return $this->parentId;
}
public function addDefer(callable $callable)
{
array_unshift($this->deferCallbacks, $callable);
}
/**
* @param callable $callable
* @param ...$data
* @return static
*/
public static function create(callable $callable, ...$data)
{
$coroutine = new self($callable);
$coroutine->resume(...$data);
return $coroutine;
}
public static function id(): int
{
return static::getCurrent()->getId();
}
public static function pid(?int $id = null): int
{
if ($id === null) {
$coroutine = static::getCurrent();
if ($coroutine instanceof static) {
return static::getCurrent()->getParentId();
}
return 0;
}
$coroutine = static::get($id);
if ($coroutine === null) {
throw new CoroutineDestroyedException(sprintf('Coroutine #%d has been destroyed.', $id));
}
return $coroutine->getParentId();
}
public static function set(array $config): void
{
}
/**
* @param int|null $id
* @return ArrayObject|null
*/
public static function getContextFor(?int $id = null): ?ArrayObject
{
$coroutine = is_null($id) ? static::getCurrent() : static::get($id);
if ($coroutine === null) {
return null;
}
if ($coroutine instanceof static) {
return $coroutine->getContext();
}
if (static::$mainContext === null) {
static::$mainContext = new ArrayObject();
}
return static::$mainContext;
}
public static function defer(callable $callable): void
{
$coroutine = static::getCurrent();
if ($coroutine instanceof static) {
$coroutine->addDefer($callable);
}
}
}

View File

@ -1,8 +0,0 @@
<?php
namespace App\Container\Swow\Exception;
class CoroutineDestroyedException extends \RuntimeException
{
}

View File

@ -1,96 +0,0 @@
<?php
namespace App\Container\Swow\Http;
use App\Container\Logger;
use App\Container\Swow\Coroutine;
use Swow\CoroutineException;
use Swow\Errno;
use Swow\Http\ResponseException;
use Swow\Http\Server as HttpServer;
use Swow\Socket;
use Swow\SocketException;
use function Swow\Sync\waitAll;
class Server extends HttpServer
{
/**
* @var string|null
*/
public $host = null;
/**
* @var int|null
*/
public $port = null;
/**
* @var callable
*/
protected $handler;
/**
* @param string $name
* @param int $port
* @param int $flags
* @return static
*/
public function bind(string $name, int $port = 0, int $flags = Socket::BIND_FLAG_NONE): static
{
$this->host = $name;
$this->port = $port;
parent::bind($name, $port, $flags);
return $this;
}
public function handle(callable $callable)
{
$this->handler = $callable;
return $this;
}
public function start()
{
$this->listen();
Coroutine::create(function () {
while (true) {
try {
$connection = $this->acceptConnection();
Coroutine::create(function () use ($connection) {
try {
while (true) {
$request = null;
try {
$request = $connection->recvHttpRequest();
$handler = $this->handler;
$handler($request, $connection);
} catch (ResponseException $exception) {
$connection->error($exception->getCode(), $exception->getMessage());
}
if (!$request || !$request->getKeepAlive()) {
break;
}
}
} catch (\Throwable $exception) {
Logger::instance()->error((string)$exception);
} finally {
$connection->close();
}
});
} catch (SocketException|CoroutineException $exception) {
if (in_array($exception->getCode(), [Errno::EMFILE, Errno::ENFILE, Errno::ENOMEM], true)) {
Logger::instance()->warning('Socket resources have been exhausted.');
sleep(1);
} else {
Logger::instance()->error((string)$exception);
break;
}
} catch (\Throwable $exception) {
Logger::instance()->error((string)$exception);
}
}
});
waitAll();
}
}

View File

@ -1,66 +0,0 @@
<?php
namespace App\Container\Swow;
use Psr\Log\LoggerInterface;
use Swow\Socket;
class Swow extends Socket
{
/**
* @var string|null
*/
public $host = null;
/**
* @var int|null
*/
public $port = null;
/**
* @var callable
*/
protected $handler;
/**
* @var LoggerInterface
*/
protected $logger;
/**
* @param LoggerInterface $logger
* @param int $type
*/
public function __construct(LoggerInterface $logger, int $type = Socket::TYPE_TCP)
{
parent::__construct($type);
}
/**
* @param string $name
* @param int $port
* @param int $flags
* @return static
*/
public function bind(string $name, int $port = 0, int $flags = Socket::BIND_FLAG_NONE): static
{
$this->host = $name;
$this->port = $port;
parent::bind($name, $port, $flags);
return $this;
}
public function handle(callable $callable)
{
$this->handler = $callable;
return $this;
}
public function start()
{
$this->listen();
while (true) {
Coroutine::create($this->handler, $this->accept());
}
}
}

View File

@ -7,7 +7,7 @@ ini_set('memory_limit', '1G');
date_default_timezone_set('Asia/Shanghai');
!defined('BASE_PATH') && define('BASE_PATH', dirname(__DIR__, 1));
! defined('BASE_PATH') && define('BASE_PATH', dirname(__DIR__, 1));
require BASE_PATH . '/vendor/autoload.php';
@ -15,14 +15,103 @@ use App\Container\Logger;
use App\Error;
use App\Vega;
use Dotenv\Dotenv;
use App\Container\Swow\Http\Server;
use Swow\CoroutineException;
use Swow\Errno;
use Swow\Http\ResponseException;
use Swow\Http\Server as HttpServer;
use Swow\Socket;
use Swow\SocketException;
use function Swow\Sync\waitAll;
Dotenv::createUnsafeImmutable(__DIR__ . '/../', '.env')->load();
define("APP_DEBUG", env('APP_DEBUG'));
Error::register();
class SwowServer extends HttpServer
{
/**
* @var string|null
*/
public $host = null;
/**
* @var int|null
*/
public $port = null;
/**
* @var callable
*/
protected $handler;
/**
* @param string $name
* @param int $port
* @param int $flags
* @return static
*/
public function bind(string $name, int $port = 0, int $flags = Socket::BIND_FLAG_NONE): static
{
$this->host = $name;
$this->port = $port;
parent::bind($name, $port, $flags);
return $this;
}
public function handle(callable $callable)
{
$this->handler = $callable;
return $this;
}
public function start()
{
$this->listen();
\Swow\Coroutine::run(function () {
while (true) {
try {
$connection = $this->acceptConnection();
\Swow\Coroutine::run(function () use ($connection) {
try {
while (true) {
$request = null;
try {
$request = $connection->recvHttpRequest();
$handler = $this->handler;
$handler($request, $connection);
} catch (ResponseException $exception) {
$connection->error($exception->getCode(), $exception->getMessage());
}
if (!$request || !$request->getKeepAlive()) {
break;
}
}
} catch (\Throwable $exception) {
Logger::instance()->error((string)$exception);
} finally {
$connection->close();
}
});
} catch (SocketException|CoroutineException $exception) {
if (in_array($exception->getCode(), [Errno::EMFILE, Errno::ENFILE, Errno::ENOMEM], true)) {
Logger::instance()->warning('Socket resources have been exhausted.');
sleep(1);
} else {
Logger::instance()->error((string)$exception);
break;
}
} catch (\Throwable $exception) {
Logger::instance()->error((string)$exception);
}
}
});
waitAll();
}
}
$vega = Vega::new();
$server = new Server();
$server = new SwowServer();
$host = '0.0.0.0';
$port = 9501;
$server->bind($host, $port)->handle($vega->handler());
@ -41,7 +130,4 @@ printf("PHP Version: %s\n", PHP_VERSION);
printf("Swow Version: %s\n", '0.1.0');
printf("Listen Addr: http://%s:%d\n", $host, $port);
Logger::instance()->info('Start swow coroutine server');
$server->start();
$server->start();

View File

@ -1,136 +0,0 @@
<?php
namespace App\Container\Swow;
use App\Container\Swow\Exception\CoroutineDestroyedException;
use ArrayObject;
use Swow\Coroutine as SwowCo;
class Coroutine extends SwowCo
{
/**
* @var ArrayObject
*/
protected $context;
/**
* @var int
*/
protected $parentId;
/**
* @var callable[]
*/
protected $deferCallbacks = [];
/**
* @var ArrayObject|null
*/
protected static $mainContext = null;
public function __construct(callable $callable)
{
parent::__construct($callable);
$this->context = new ArrayObject();
$this->parentId = static::getCurrent()->getId();
}
public function __destruct()
{
while (!empty($this->deferCallbacks)) {
array_shift($this->deferCallbacks)();
}
}
/**
* @param ...$data
* @return static
*/
public function execute(...$data)
{
$this->resume(...$data);
return $this;
}
public function getContext()
{
return $this->context;
}
public function getParentId(): int
{
return $this->parentId;
}
public function addDefer(callable $callable)
{
array_unshift($this->deferCallbacks, $callable);
}
/**
* @param callable $callable
* @param ...$data
* @return static
*/
public static function create(callable $callable, ...$data)
{
$coroutine = new self($callable);
$coroutine->resume(...$data);
return $coroutine;
}
public static function id(): int
{
return static::getCurrent()->getId();
}
public static function pid(?int $id = null): int
{
if ($id === null) {
$coroutine = static::getCurrent();
if ($coroutine instanceof static) {
return static::getCurrent()->getParentId();
}
return 0;
}
$coroutine = static::get($id);
if ($coroutine === null) {
throw new CoroutineDestroyedException(sprintf('Coroutine #%d has been destroyed.', $id));
}
return $coroutine->getParentId();
}
public static function set(array $config): void
{
}
/**
* @param int|null $id
* @return ArrayObject|null
*/
public static function getContextFor(?int $id = null): ?ArrayObject
{
$coroutine = is_null($id) ? static::getCurrent() : static::get($id);
if ($coroutine === null) {
return null;
}
if ($coroutine instanceof static) {
return $coroutine->getContext();
}
if (static::$mainContext === null) {
static::$mainContext = new ArrayObject();
}
return static::$mainContext;
}
public static function defer(callable $callable): void
{
$coroutine = static::getCurrent();
if ($coroutine instanceof static) {
$coroutine->addDefer($callable);
}
}
}

View File

@ -1,8 +0,0 @@
<?php
namespace App\Container\Swow\Exception;
class CoroutineDestroyedException extends \RuntimeException
{
}

View File

@ -1,96 +0,0 @@
<?php
namespace App\Container\Swow\Http;
use App\Container\Logger;
use App\Container\Swow\Coroutine;
use Swow\CoroutineException;
use Swow\Errno;
use Swow\Http\ResponseException;
use Swow\Http\Server as HttpServer;
use Swow\Socket;
use Swow\SocketException;
use function Swow\Sync\waitAll;
class Server extends HttpServer
{
/**
* @var string|null
*/
public $host = null;
/**
* @var int|null
*/
public $port = null;
/**
* @var callable
*/
protected $handler;
/**
* @param string $name
* @param int $port
* @param int $flags
* @return static
*/
public function bind(string $name, int $port = 0, int $flags = Socket::BIND_FLAG_NONE): static
{
$this->host = $name;
$this->port = $port;
parent::bind($name, $port, $flags);
return $this;
}
public function handle(callable $callable)
{
$this->handler = $callable;
return $this;
}
public function start()
{
$this->listen();
Coroutine::create(function () {
while (true) {
try {
$connection = $this->acceptConnection();
Coroutine::create(function () use ($connection) {
try {
while (true) {
$request = null;
try {
$request = $connection->recvHttpRequest();
$handler = $this->handler;
$handler($request, $connection);
} catch (ResponseException $exception) {
$connection->error($exception->getCode(), $exception->getMessage());
}
if (!$request || !$request->getKeepAlive()) {
break;
}
}
} catch (\Throwable $exception) {
Logger::instance()->error((string)$exception);
} finally {
$connection->close();
}
});
} catch (SocketException|CoroutineException $exception) {
if (in_array($exception->getCode(), [Errno::EMFILE, Errno::ENFILE, Errno::ENOMEM], true)) {
Logger::instance()->warning('Socket resources have been exhausted.');
sleep(1);
} else {
Logger::instance()->error((string)$exception);
break;
}
} catch (\Throwable $exception) {
Logger::instance()->error((string)$exception);
}
}
});
waitAll();
}
}

View File

@ -1,66 +0,0 @@
<?php
namespace App\Container\Swow;
use Psr\Log\LoggerInterface;
use Swow\Socket;
class Swow extends Socket
{
/**
* @var string|null
*/
public $host = null;
/**
* @var int|null
*/
public $port = null;
/**
* @var callable
*/
protected $handler;
/**
* @var LoggerInterface
*/
protected $logger;
/**
* @param LoggerInterface $logger
* @param int $type
*/
public function __construct(LoggerInterface $logger, int $type = Socket::TYPE_TCP)
{
parent::__construct($type);
}
/**
* @param string $name
* @param int $port
* @param int $flags
* @return static
*/
public function bind(string $name, int $port = 0, int $flags = Socket::BIND_FLAG_NONE): static
{
$this->host = $name;
$this->port = $port;
parent::bind($name, $port, $flags);
return $this;
}
public function handle(callable $callable)
{
$this->handler = $callable;
return $this;
}
public function start()
{
$this->listen();
while (true) {
Coroutine::create($this->handler, $this->accept());
}
}
}