流媒体
Streams 利用 Streaming HTTP 协议,通过开放的流式 API 连接传递数据。 不像 REST API 所期望的那样,通过客户端应用程序的重复请求批量传送数据,而是在应用程序和 API 之间打开单个连接,每当出现新消息时,都会通过该连接发送新结果。 这导致可以支持非常高的吞吐量的低延迟交付机制。 有关详细信息,请参阅 https://developer.twitter.com/en/docs/tutorials/consuming-streaming-data
Stream
允许使用Twitter API v1.1 过滤 和
采样 实时推文。
StreamingClient
允许使用Twitter API v2 过滤 和
采样 实时推文。
使用 Stream
使用 Stream
需要通过Twitter
API credentials (Consumer Key, Consumer Secret, Access Token, Access Token
Secret)初始化一个实例
import tweepy
stream = tweepy.Stream(
"Consumer Key here", "Consumer Secret here",
"Access Token here", "Access Token Secret here"
)
然后, 可以使用 Stream.filter()
和 Stream.sample()
连接并运行一个流:
stream.filter(track=["Tweepy"])
从流中接收的数据被传递给 Stream.on_data()
。 此方法根据消息类型将数据发送到其他方法。
例如, 如果从流中接收到一条推文, 原始数据将被发送到
Stream.on_data()
, 后者将构造一个对象 Status
并传递给 Stream.on_status()
。 默认情况下,除了
Stream.on_data()
外, 从流中接收数据的其他方法只是简单地记录接收到的数据, 记录级别 l 取决于数据类型。
自定义流数据的处理, Stream
需要被继承。 例如,输出收到的每条推文ID:
class IDPrinter(tweepy.Stream):
def on_status(self, status):
print(status.id)
printer = IDPrinter(
"Consumer Key here", "Consumer Secret here",
"Access Token here", "Access Token Secret here"
)
printer.sample()
使用 StreamingClient
使用 StreamingClient
,需要通过 Twitter API Bearer Token 初始化一个实例:
import tweepy
streaming_client = tweepy.StreamingClient("Bearer Token here")
然后, 可以使用 StreamingClient.sample()
连接并运行一个流:
streaming_client.sample()
使用 StreamingClient.add_rules()
可以在
StreamingClient.filter()
连接和运行一个过滤流之前添加规则:
streaming_client.add_rules(tweepy.StreamRule("Tweepy"))
streaming_client.filter()
StreamingClient.get_rules()
可以检索现有规则。
StreamingClient.delete_rules()
可以删除规则。
要了解如何构建规则,请参阅过滤流文档 Building rules for filtered stream 。
从流中接收的数据被传递给 StreamingClient.on_data()
.
这个方法将数据传递给其他方法。 接收到的推文传递给 StreamingClient.on_tweet()
, includes
数据传递给
StreamingClient.on_includes()
, 错误传递给
StreamingClient.on_errors()
, 匹配规则传递给
StreamingClient.on_matching_rules()
。 一个 StreamResponse
实例
传递给 StreamingClient.on_response()
。默认情况下, 只有 StreamingClient.on_response()
在 DEBUG
logging level 级别下,记录接收到的数据。
自定义流数据的处理, StreamingClient
需要被继承。例如,要打印收到的每条推文ID::
class IDPrinter(tweepy.StreamingClient):
def on_tweet(self, tweet):
print(tweet.id)
printer = IDPrinter("Bearer Token here")
printer.sample()
线程
Stream.filter()
, Stream.sample()
, StreamingClient.filter()
,
和 StreamingClient.sample()
都有一个 线程
参数. 当设置为 True
时,流将在单独的
线程 中运行,该线程由方法调用返回。 例如:
thread = stream.filter(follow=[1072250532645998596], threaded=True)
或:
thread = streaming_client.sample(threaded=True)
错误处理
Stream
和StreamingClient
都有多种方法来处理流期间的错误。
当 Twitter 关闭流时调用 Stream.on_closed()
/ StreamingClient.on_closed()
。
Stream.on_connection_error()
/
StreamingClient.on_connection_error()
在流遇到连接错误时被调用。
Stream.on_request_error()
/ StreamingClient.on_request_error()
在尝试连接到流时遇到错误时调用。
当遇到这些错误,并且没有超过最大重试次数 max_retries
时, Stream
/
StreamingClient
的实例将尝试在适当的时间后重新连接流。 默认情况下,任何版本的这些方法都会记录错误。 如果需要自定义错误处理,可以在子类中继承它们。
class ConnectionTester(tweepy.Stream):
def on_connection_error(self):
self.disconnect()
class ConnectionTester(tweepy.StreamingClient):
def on_connection_error(self):
self.disconnect()
Stream.on_request_error()
/ StreamingClient.on_request_error()
也会传递遇到的 HTTP 状态码。Twitter API 的 HTTP 状态码请参考
https://developer.twitter.com/en/support/twitter-api/error-troubleshooting。
Stream.on_exception()
/ StreamingClient.on_exception()
在发生未处理的异常时被调用。 这对流来说是致命的,默认情况下会记录一个异常。