<?php

namespace React\Tests\Stream;

use Clue\StreamFilter as Filter;
use React\Stream\WritableResourceStream;

class WritableResourceStreamTest extends TestCase
{
    /**
     * @covers React\Stream\WritableResourceStream::__construct
     * @doesNotPerformAssertions
     */
    public function testConstructor()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();

        new WritableResourceStream($stream, $loop);
    }

    /**
     * @covers React\Stream\WritableResourceStream::__construct
     * @doesNotPerformAssertions
     */
    public function testConstructorWithExcessiveMode()
    {
        // excessive flags are ignored for temp streams, so we have to use a file stream
        $name = tempnam(sys_get_temp_dir(), 'test');
        $stream = @fopen($name, 'w+eANYTHING');
        unlink($name);

        $loop = $this->createLoopMock();
        $buffer = new WritableResourceStream($stream, $loop);
        $buffer->close();
    }

    /**
     * @covers React\Stream\WritableResourceStream::__construct
     * @expectedException InvalidArgumentException
     */
    public function testConstructorThrowsIfNotAValidStreamResource()
    {
        $stream = null;
        $loop = $this->createLoopMock();

        new WritableResourceStream($stream, $loop);
    }

    /**
     * @covers React\Stream\WritableResourceStream::__construct
     * @expectedException InvalidArgumentException
     */
    public function testConstructorThrowsExceptionOnReadOnlyStream()
    {
        $stream = fopen('php://temp', 'r');
        $loop = $this->createLoopMock();

        new WritableResourceStream($stream, $loop);
    }

    /**
     * @covers React\Stream\WritableResourceStream::__construct
     * @expectedException InvalidArgumentException
     */
    public function testConstructorThrowsExceptionOnReadOnlyStreamWithExcessiveMode()
    {
        // excessive flags are ignored for temp streams, so we have to use a file stream
        $name = tempnam(sys_get_temp_dir(), 'test');
        $stream = fopen($name, 'reANYTHING');
        unlink($name);

        $loop = $this->createLoopMock();
        new WritableResourceStream($stream, $loop);
    }

    /**
     * @covers React\Stream\WritableResourceStream::__construct
     * @expectedException RuntimeException
     */
    public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking()
    {
        if (!in_array('blocking', stream_get_wrappers())) {
            stream_wrapper_register('blocking', 'React\Tests\Stream\EnforceBlockingWrapper');
        }

        $stream = fopen('blocking://test', 'r+');
        $loop = $this->createLoopMock();

        new WritableResourceStream($stream, $loop);
    }

    /**
     * @covers React\Stream\WritableResourceStream::write
     * @covers React\Stream\WritableResourceStream::handleWrite
     */
    public function testWrite()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createWriteableLoopMock();

        $buffer = new WritableResourceStream($stream, $loop);
        $buffer->on('error', $this->expectCallableNever());

        $buffer->write("foobar\n");
        rewind($stream);
        $this->assertSame("foobar\n", fread($stream, 1024));
    }

    /**
     * @covers React\Stream\WritableResourceStream::write
     */
    public function testWriteWithDataDoesAddResourceToLoop()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();
        $loop->expects($this->once())->method('addWriteStream')->with($this->equalTo($stream));

        $buffer = new WritableResourceStream($stream, $loop);

        $buffer->write("foobar\n");
    }

    /**
     * @covers React\Stream\WritableResourceStream::write
     * @covers React\Stream\WritableResourceStream::handleWrite
     */
    public function testEmptyWriteDoesNotAddToLoop()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();
        $loop->expects($this->never())->method('addWriteStream');

        $buffer = new WritableResourceStream($stream, $loop);

        $buffer->write("");
        $buffer->write(null);
    }

    /**
     * @covers React\Stream\WritableResourceStream::write
     * @covers React\Stream\WritableResourceStream::handleWrite
     */
    public function testWriteReturnsFalseWhenWritableResourceStreamIsFull()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createWriteableLoopMock();
        $loop->preventWrites = true;

        $buffer = new WritableResourceStream($stream, $loop, 4);
        $buffer->on('error', $this->expectCallableNever());

        $this->assertTrue($buffer->write("foo"));
        $loop->preventWrites = false;
        $this->assertFalse($buffer->write("bar\n"));
    }

    /**
     * @covers React\Stream\WritableResourceStream::write
     */
    public function testWriteReturnsFalseWhenWritableResourceStreamIsExactlyFull()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();

        $buffer = new WritableResourceStream($stream, $loop, 3);

        $this->assertFalse($buffer->write("foo"));
    }

    /**
     * @covers React\Stream\WritableResourceStream::write
     * @covers React\Stream\WritableResourceStream::handleWrite
     */
    public function testWriteDetectsWhenOtherSideIsClosed()
    {
        list($a, $b) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);

        $loop = $this->createWriteableLoopMock();

        $buffer = new WritableResourceStream($a, $loop, 4);
        $buffer->on('error', $this->expectCallableOnce());

        fclose($b);

        $buffer->write("foo");
    }

    /**
     * @covers React\Stream\WritableResourceStream::write
     * @covers React\Stream\WritableResourceStream::handleWrite
     */
    public function testEmitsDrainAfterWriteWhichExceedsBuffer()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();

        $buffer = new WritableResourceStream($stream, $loop, 2);
        $buffer->on('error', $this->expectCallableNever());
        $buffer->on('drain', $this->expectCallableOnce());

        $buffer->write("foo");
        $buffer->handleWrite();
    }

    /**
     * @covers React\Stream\WritableResourceStream::write
     * @covers React\Stream\WritableResourceStream::handleWrite
     */
    public function testWriteInDrain()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();

        $buffer = new WritableResourceStream($stream, $loop, 2);
        $buffer->on('error', $this->expectCallableNever());

        $buffer->once('drain', function () use ($buffer) {
            $buffer->write("bar\n");
            $buffer->handleWrite();
        });

        $this->assertFalse($buffer->write("foo\n"));
        $buffer->handleWrite();

        fseek($stream, 0);
        $this->assertSame("foo\nbar\n", stream_get_contents($stream));
    }

    /**
     * @covers React\Stream\WritableResourceStream::write
     * @covers React\Stream\WritableResourceStream::handleWrite
     */
    public function testDrainAfterWrite()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();

        $buffer = new WritableResourceStream($stream, $loop, 2);

        $buffer->on('drain', $this->expectCallableOnce());

        $buffer->write("foo");
        $buffer->handleWrite();
    }

    /**
     * @covers React\Stream\WritableResourceStream::handleWrite
     */
    public function testDrainAfterWriteWillRemoveResourceFromLoopWithoutClosing()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();
        $loop->expects($this->once())->method('removeWriteStream')->with($stream);

        $buffer = new WritableResourceStream($stream, $loop, 2);

        $buffer->on('drain', $this->expectCallableOnce());

        $buffer->on('close', $this->expectCallableNever());

        $buffer->write("foo");
        $buffer->handleWrite();
    }

    /**
     * @covers React\Stream\WritableResourceStream::handleWrite
     */
    public function testClosingDuringDrainAfterWriteWillRemoveResourceFromLoopOnceAndClose()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();
        $loop->expects($this->once())->method('removeWriteStream')->with($stream);

        $buffer = new WritableResourceStream($stream, $loop, 2);

        $buffer->on('drain', function () use ($buffer) {
            $buffer->close();
        });

        $buffer->on('close', $this->expectCallableOnce());

        $buffer->write("foo");
        $buffer->handleWrite();
    }

    /**
     * @covers React\Stream\WritableResourceStream::end
     */
    public function testEndWithoutDataClosesImmediatelyIfWritableResourceStreamIsEmpty()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();

        $buffer = new WritableResourceStream($stream, $loop);
        $buffer->on('error', $this->expectCallableNever());
        $buffer->on('close', $this->expectCallableOnce());

        $this->assertTrue($buffer->isWritable());
        $buffer->end();
        $this->assertFalse($buffer->isWritable());
    }

    /**
     * @covers React\Stream\WritableResourceStream::end
     */
    public function testEndWithoutDataDoesNotCloseIfWritableResourceStreamIsFull()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();

        $buffer = new WritableResourceStream($stream, $loop);
        $buffer->on('error', $this->expectCallableNever());
        $buffer->on('close', $this->expectCallableNever());

        $buffer->write('foo');

        $this->assertTrue($buffer->isWritable());
        $buffer->end();
        $this->assertFalse($buffer->isWritable());
    }

    /**
     * @covers React\Stream\WritableResourceStream::end
     */
    public function testEndWithDataClosesImmediatelyIfWritableResourceStreamFlushes()
    {
        $stream = fopen('php://temp', 'r+');
        $filterBuffer = '';
        $loop = $this->createLoopMock();

        $buffer = new WritableResourceStream($stream, $loop);
        $buffer->on('error', $this->expectCallableNever());
        $buffer->on('close', $this->expectCallableOnce());

        Filter\append($stream, function ($chunk) use (&$filterBuffer) {
            $filterBuffer .= $chunk;
            return $chunk;
        });

        $this->assertTrue($buffer->isWritable());
        $buffer->end('final words');
        $this->assertFalse($buffer->isWritable());

        $buffer->handleWrite();
        $this->assertSame('final words', $filterBuffer);
    }

    /**
     * @covers React\Stream\WritableResourceStream::end
     */
    public function testEndWithDataDoesNotCloseImmediatelyIfWritableResourceStreamIsFull()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();

        $buffer = new WritableResourceStream($stream, $loop);
        $buffer->on('error', $this->expectCallableNever());
        $buffer->on('close', $this->expectCallableNever());

        $buffer->write('foo');

        $this->assertTrue($buffer->isWritable());
        $buffer->end('final words');
        $this->assertFalse($buffer->isWritable());

        rewind($stream);
        $this->assertSame('', stream_get_contents($stream));
    }

    /**
     * @covers React\Stream\WritableResourceStream::isWritable
     * @covers React\Stream\WritableResourceStream::close
     */
    public function testClose()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();

        $buffer = new WritableResourceStream($stream, $loop);
        $buffer->on('error', $this->expectCallableNever());
        $buffer->on('close', $this->expectCallableOnce());

        $this->assertTrue($buffer->isWritable());
        $buffer->close();
        $this->assertFalse($buffer->isWritable());

        $this->assertEquals(array(), $buffer->listeners('close'));
    }

    /**
     * @covers React\Stream\WritableResourceStream::close
     */
    public function testClosingAfterWriteRemovesStreamFromLoop()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();
        $buffer = new WritableResourceStream($stream, $loop);

        $loop->expects($this->once())->method('removeWriteStream')->with($stream);

        $buffer->write('foo');
        $buffer->close();
    }

    /**
     * @covers React\Stream\WritableResourceStream::close
     */
    public function testClosingWithoutWritingDoesNotRemoveStreamFromLoop()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();
        $buffer = new WritableResourceStream($stream, $loop);

        $loop->expects($this->never())->method('removeWriteStream');

        $buffer->close();
    }

    /**
     * @covers React\Stream\WritableResourceStream::close
     */
    public function testDoubleCloseWillEmitOnlyOnce()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createLoopMock();

        $buffer = new WritableResourceStream($stream, $loop);
        $buffer->on('close', $this->expectCallableOnce());

        $buffer->close();
        $buffer->close();
    }

    /**
     * @covers React\Stream\WritableResourceStream::write
     * @covers React\Stream\WritableResourceStream::close
     */
    public function testWritingToClosedWritableResourceStreamShouldNotWriteToStream()
    {
        $stream = fopen('php://temp', 'r+');
        $filterBuffer = '';
        $loop = $this->createLoopMock();

        $buffer = new WritableResourceStream($stream, $loop);

        Filter\append($stream, function ($chunk) use (&$filterBuffer) {
            $filterBuffer .= $chunk;
            return $chunk;
        });

        $buffer->close();

        $buffer->write('foo');

        $buffer->handleWrite();
        $this->assertSame('', $filterBuffer);
    }

    /**
     * @covers React\Stream\WritableResourceStream::handleWrite
     */
    public function testErrorWhenStreamResourceIsInvalid()
    {
        $stream = fopen('php://temp', 'r+');
        $loop = $this->createWriteableLoopMock();

        $error = null;

        $buffer = new WritableResourceStream($stream, $loop);
        $buffer->on('error', function ($message) use (&$error) {
            $error = $message;
        });

        // invalidate stream resource
        fclose($stream);

        $buffer->write('Attempting to write to bad stream');

        $this->assertInstanceOf('Exception', $error);

        // the error messages differ between PHP versions, let's just check substrings
        $this->assertContains('Unable to write to stream: ', $error->getMessage());
        $this->assertContains(' not a valid stream resource', $error->getMessage(), '', true);
    }

    public function testWritingToClosedStream()
    {
        if ('Darwin' === PHP_OS) {
            $this->markTestSkipped('OS X issue with shutting down pair for writing');
        }

        list($a, $b) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
        $loop = $this->createLoopMock();

        $error = null;

        $buffer = new WritableResourceStream($a, $loop);
        $buffer->on('error', function($message) use (&$error) {
            $error = $message;
        });

        $buffer->write('foo');
        $buffer->handleWrite();
        stream_socket_shutdown($b, STREAM_SHUT_RD);
        stream_socket_shutdown($a, STREAM_SHUT_RD);
        $buffer->write('bar');
        $buffer->handleWrite();

        $this->assertInstanceOf('Exception', $error);
        $this->assertSame('Unable to write to stream: fwrite(): send of 3 bytes failed with errno=32 Broken pipe', $error->getMessage());
    }

    private function createWriteableLoopMock()
    {
        $loop = $this->createLoopMock();
        $loop->preventWrites = false;
        $loop
            ->expects($this->any())
            ->method('addWriteStream')
            ->will($this->returnCallback(function ($stream, $listener) use ($loop) {
                if (!$loop->preventWrites) {
                    call_user_func($listener, $stream);
                }
            }));

        return $loop;
    }

    private function createLoopMock()
    {
        return $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
    }
}