<?php

namespace React\Tests\Stream;

use React\Stream\ThroughStream;

/**
 * @covers React\Stream\ThroughStream
 */
class ThroughStreamTest extends TestCase
{
    /**
     * @test
     * @expectedException InvalidArgumentException
     */
    public function itShouldRejectInvalidCallback()
    {
        new ThroughStream(123);
    }

    /** @test */
    public function itShouldReturnTrueForAnyDataWrittenToIt()
    {
        $through = new ThroughStream();
        $ret = $through->write('foo');

        $this->assertTrue($ret);
    }

    /** @test */
    public function itShouldEmitAnyDataWrittenToIt()
    {
        $through = new ThroughStream();
        $through->on('data', $this->expectCallableOnceWith('foo'));
        $through->write('foo');
    }

    /** @test */
    public function itShouldEmitAnyDataWrittenToItPassedThruFunction()
    {
        $through = new ThroughStream('strtoupper');
        $through->on('data', $this->expectCallableOnceWith('FOO'));
        $through->write('foo');
    }

    /** @test */
    public function itShouldEmitAnyDataWrittenToItPassedThruCallback()
    {
        $through = new ThroughStream('strtoupper');
        $through->on('data', $this->expectCallableOnceWith('FOO'));
        $through->write('foo');
    }

    /** @test */
    public function itShouldEmitErrorAndCloseIfCallbackThrowsException()
    {
        $through = new ThroughStream(function () {
            throw new \RuntimeException();
        });
        $through->on('error', $this->expectCallableOnce());
        $through->on('close', $this->expectCallableOnce());
        $through->on('data', $this->expectCallableNever());
        $through->on('end', $this->expectCallableNever());

        $through->write('foo');

        $this->assertFalse($through->isReadable());
        $this->assertFalse($through->isWritable());
    }

    /** @test */
    public function itShouldEmitErrorAndCloseIfCallbackThrowsExceptionOnEnd()
    {
        $through = new ThroughStream(function () {
            throw new \RuntimeException();
        });
        $through->on('error', $this->expectCallableOnce());
        $through->on('close', $this->expectCallableOnce());
        $through->on('data', $this->expectCallableNever());
        $through->on('end', $this->expectCallableNever());

        $through->end('foo');

        $this->assertFalse($through->isReadable());
        $this->assertFalse($through->isWritable());
    }

    /** @test */
    public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused()
    {
        $through = new ThroughStream();
        $through->pause();
        $ret = $through->write('foo');

        $this->assertFalse($ret);
    }

    /** @test */
    public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWhenPaused()
    {
        $through = new ThroughStream();
        $through->pause();
        $through->write('foo');

        $through->on('drain', $this->expectCallableOnce());
        $through->resume();
    }

    /** @test */
    public function itShouldReturnTrueForAnyDataWrittenToItWhenResumedAfterPause()
    {
        $through = new ThroughStream();
        $through->on('drain', $this->expectCallableNever());
        $through->pause();
        $through->resume();
        $ret = $through->write('foo');

        $this->assertTrue($ret);
    }

    /** @test */
    public function pipingStuffIntoItShouldWork()
    {
        $readable = new ThroughStream();

        $through = new ThroughStream();
        $through->on('data', $this->expectCallableOnceWith('foo'));

        $readable->pipe($through);
        $readable->emit('data', array('foo'));
    }

    /** @test */
    public function endShouldEmitEndAndClose()
    {
        $through = new ThroughStream();
        $through->on('data', $this->expectCallableNever());
        $through->on('end', $this->expectCallableOnce());
        $through->on('close', $this->expectCallableOnce());
        $through->end();
    }

    /** @test */
    public function endShouldCloseTheStream()
    {
        $through = new ThroughStream();
        $through->on('data', $this->expectCallableNever());
        $through->end();

        $this->assertFalse($through->isReadable());
        $this->assertFalse($through->isWritable());
    }

    /** @test */
    public function endShouldWriteDataBeforeClosing()
    {
        $through = new ThroughStream();
        $through->on('data', $this->expectCallableOnceWith('foo'));
        $through->end('foo');

        $this->assertFalse($through->isReadable());
        $this->assertFalse($through->isWritable());
    }

    /** @test */
    public function endTwiceShouldOnlyEmitOnce()
    {
        $through = new ThroughStream();
        $through->on('data', $this->expectCallableOnce('first'));
        $through->end('first');
        $through->end('ignored');
    }

    /** @test */
    public function writeAfterEndShouldReturnFalse()
    {
        $through = new ThroughStream();
        $through->on('data', $this->expectCallableNever());
        $through->end();

        $this->assertFalse($through->write('foo'));
    }

    /** @test */
    public function writeDataWillCloseStreamShouldReturnFalse()
    {
        $through = new ThroughStream();
        $through->on('data', array($through, 'close'));

        $this->assertFalse($through->write('foo'));
    }

    /** @test */
    public function writeDataToPausedShouldReturnFalse()
    {
        $through = new ThroughStream();
        $through->pause();

        $this->assertFalse($through->write('foo'));
    }

    /** @test */
    public function writeDataToResumedShouldReturnTrue()
    {
        $through = new ThroughStream();
        $through->pause();
        $through->resume();

        $this->assertTrue($through->write('foo'));
    }

    /** @test */
    public function itShouldBeReadableByDefault()
    {
        $through = new ThroughStream();
        $this->assertTrue($through->isReadable());
    }

    /** @test */
    public function itShouldBeWritableByDefault()
    {
        $through = new ThroughStream();
        $this->assertTrue($through->isWritable());
    }

    /** @test */
    public function closeShouldCloseOnce()
    {
        $through = new ThroughStream();

        $through->on('close', $this->expectCallableOnce());

        $through->close();

        $this->assertFalse($through->isReadable());
        $this->assertFalse($through->isWritable());
    }

    /** @test */
    public function doubleCloseShouldCloseOnce()
    {
        $through = new ThroughStream();

        $through->on('close', $this->expectCallableOnce());

        $through->close();
        $through->close();

        $this->assertFalse($through->isReadable());
        $this->assertFalse($through->isWritable());
    }

    /** @test */
    public function pipeShouldPipeCorrectly()
    {
        $output = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
        $output->expects($this->any())->method('isWritable')->willReturn(True);
        $output
            ->expects($this->once())
            ->method('write')
            ->with('foo');

        $through = new ThroughStream();
        $through->pipe($output);
        $through->write('foo');
    }
}