流媒体

Streams 利用 Streaming HTTP 协议,通过开放的流式 API 连接传递数据。 不像 REST API 所期望的那样,通过客户端应用程序的重复请求批量传送数据,而是在应用程序和 API 之间打开单个连接,每当出现新消息时,都会通过该连接发送新结果。 这导致可以支持非常高的吞吐量的低延迟交付机制。 有关详细信息,请参阅 https://developer.twitter.com/en/docs/tutorials/consuming-streaming-data

Stream 允许使用Twitter API v1.1 过滤采样 实时推文。

StreamingClient 允许使用Twitter API v2 过滤采样 实时推文。

使用 Stream

使用 Stream 需要通过Twitter API credentials (Consumer Key, Consumer Secret, Access Token, Access Token Secret)初始化一个实例

import tweepy

stream = tweepy.Stream(
    "Consumer Key here", "Consumer Secret here",
    "Access Token here", "Access Token Secret here"
)

然后, 可以使用 Stream.filter()Stream.sample() 连接并运行一个流:

stream.filter(track=["Tweepy"])

从流中接收的数据被传递给 Stream.on_data() 。 此方法根据消息类型将数据发送到其他方法。 例如, 如果从流中接收到一条推文, 原始数据将被发送到 Stream.on_data(), 后者将构造一个对象 Status 并传递给 Stream.on_status()。 默认情况下,除了 Stream.on_data() 外, 从流中接收数据的其他方法只是简单地记录接收到的数据, 记录级别 l 取决于数据类型。

自定义流数据的处理, Stream 需要被继承。 例如,输出收到的每条推文ID:

class IDPrinter(tweepy.Stream):

    def on_status(self, status):
        print(status.id)


printer = IDPrinter(
    "Consumer Key here", "Consumer Secret here",
    "Access Token here", "Access Token Secret here"
)
printer.sample()

使用 StreamingClient

使用 StreamingClient,需要通过 Twitter API Bearer Token 初始化一个实例:

import tweepy

streaming_client = tweepy.StreamingClient("Bearer Token here")

然后, 可以使用 StreamingClient.sample() 连接并运行一个流:

streaming_client.sample()

使用 StreamingClient.add_rules() 可以在 StreamingClient.filter() 连接和运行一个过滤流之前添加规则:

streaming_client.add_rules(tweepy.StreamRule("Tweepy"))
streaming_client.filter()

StreamingClient.get_rules() 可以检索现有规则。 StreamingClient.delete_rules() 可以删除规则。

要了解如何构建规则,请参阅过滤流文档 Building rules for filtered stream

从流中接收的数据被传递给 StreamingClient.on_data(). 这个方法将数据传递给其他方法。 接收到的推文传递给 StreamingClient.on_tweet(), includes 数据传递给 StreamingClient.on_includes(), 错误传递给 StreamingClient.on_errors(), 匹配规则传递给 StreamingClient.on_matching_rules() 。 一个 StreamResponse 实例 传递给 StreamingClient.on_response() 。默认情况下, 只有 StreamingClient.on_response()DEBUG logging level 级别下,记录接收到的数据。

自定义流数据的处理, StreamingClient 需要被继承。例如,要打印收到的每条推文ID::

class IDPrinter(tweepy.StreamingClient):

    def on_tweet(self, tweet):
        print(tweet.id)


printer = IDPrinter("Bearer Token here")
printer.sample()

线程

Stream.filter(), Stream.sample(), StreamingClient.filter(), 和 StreamingClient.sample() 都有一个 线程 参数. 当设置为 True 时,流将在单独的 线程 中运行,该线程由方法调用返回。 例如:

thread = stream.filter(follow=[1072250532645998596], threaded=True)

或:

thread = streaming_client.sample(threaded=True)

错误处理

StreamStreamingClient 都有多种方法来处理流期间的错误。

当 Twitter 关闭流时调用 Stream.on_closed() / StreamingClient.on_closed()

Stream.on_connection_error() / StreamingClient.on_connection_error() 在流遇到连接错误时被调用。

Stream.on_request_error() / StreamingClient.on_request_error() 在尝试连接到流时遇到错误时调用。

当遇到这些错误,并且没有超过最大重试次数 max_retries 时, Stream / StreamingClient 的实例将尝试在适当的时间后重新连接流。 默认情况下,任何版本的这些方法都会记录错误。 如果需要自定义错误处理,可以在子类中继承它们。

class ConnectionTester(tweepy.Stream):

    def on_connection_error(self):
        self.disconnect()
class ConnectionTester(tweepy.StreamingClient):

    def on_connection_error(self):
        self.disconnect()

Stream.on_request_error() / StreamingClient.on_request_error() 也会传递遇到的 HTTP 状态码。Twitter API 的 HTTP 状态码请参考 https://developer.twitter.com/en/support/twitter-api/error-troubleshooting

Stream.on_exception() / StreamingClient.on_exception() 在发生未处理的异常时被调用。 这对流来说是致命的,默认情况下会记录一个异常。